当前位置:网站首页>Flink: from introduction to Zhenxiang (3. Reading data from collection and file)

Flink: from introduction to Zhenxiang (3. Reading data from collection and file)

2020-11-08 12:06:35 open_neocf7df

You can refer to : https://blog.51cto.com/mapengfei/2546985

Reading data from a collection

New package ,com.mafei.apitest, Create a new one scala Object class ,

package com.mafei.apitest

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

// Get sensor data 

case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {

  def main(args: Array[String]): Unit = {
    // Create an execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 1、 Reading data from a collection 
    val dataList = List(
      SensorReading("sensor1",1603766281,41),
      SensorReading("sensor2",1603766282,42),
      SensorReading("sensor3",1603766283,43),
      SensorReading("sensor4",1603766284,44)
    )

    val stream1 = env.fromCollection(dataList)

    stream1.print()

    // perform 
    env.execute(" source test")
  }

}

Code catalog diagram :
Flink From introduction to Zhenxiang (3、 Reading data from collections and files )
Running effect
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> SensorReading(sensor3,1603766283,43.0)
4> SensorReading(sensor1,1603766281,41.0)
3> SensorReading(sensor4,1603766284,44.0)
1> SensorReading(sensor2,1603766282,42.0)








Read data from file

Just like the first step , New package ,com.mafei.apitest, Create a new one scala Object class ,

package com.mafei.apitest

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

// Get sensor data 

case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {

  def main(args: Array[String]): Unit = {
    // Create an execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Reading data from a file 
    val stream2= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    stream2.print()

    // perform 
    env.execute(" source test")
  }

}

stay resources New under the directory sensor.txt, Write the following
sensor1,1603766281,41
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766284,44



Code structure diagram :
Flink From introduction to Zhenxiang (3、 Reading data from collections and files )

Code running effect :
1> sensor1,1603766281,41
1> sensor2,1603766282,42
2> sensor3,1603766283,43
3> sensor4,1603766284,44



版权声明
本文为[open_neocf7df]所创,转载请带上原文链接,感谢