当前位置:网站首页>Detailed explanation of common API of Flink

Detailed explanation of common API of Flink

2021-06-23 21:49:43 Baoge

@[TOC]

<font color=red size=50> There's also video explanation in my B standing - Brother Bao chbxw, I hope you can support me , thank you .</font>

The stratification of foreword API

Flink According to the degree of abstraction , Three different ones are offered API. Each of these API There are different emphases on conciseness and expressiveness , And for different application scenarios .
 Insert picture description here

  • ProcessFunction yes Flink The lowest level interface provided .ProcessFunction It can handle a single event in one or two input data streams or multiple events in a specific window . It provides fine-grained control of time and state . The developer can modify the state at will , You can also register a timer to trigger a callback function at some point in the future . therefore , You can use ProcessFunction Complex business logic based on single event is needed to implement many stateful event driven applications .
  • DataStream API Provides processing primitives for many common stream processing operations . These operations include windows 、 Record by record conversion operation , External database query when handling events, etc .DataStream API Support Java and Scala Language , Pre defined, for example map()、reduce()、aggregate() Such as function . You can implement predefined interfaces through extensions or use Java、Scala Of lambda Expressions implement custom functions .
  • SQL & Table API:Flink Supporting two types of relationship API,Table API and SQL. these two items. API Both batch and stream processing are unified API, This means on the borderless real-time data stream and the bounded historical data stream , Relational type API The query will be executed with the same semantics , And produce the same result .Table API and SQL With the help of Apache Calcite To parse the query , Verification and optimization . They can be associated with DataStream and DataSet API Seamless integration , And support user-defined scalar functions , Aggregate functions and table valued functions .
  • expanded memory bank

    • Complex event handling (CEP): Pattern detection is a very common use case in event flow processing .Flink Of CEP The library provides API, Enables users to specify event patterns in a way such as regular expressions or state machines .CEP Library and Flink Of DataStream API Integrate , In order to be in DataStream Upper evaluation mode .CEP Library applications include network intrusion detection , Business process monitoring and fraud detection .
    • DataSet API:DataSet API yes Flink The core of batch applications API.DataSet API The basic operators provided include map、reduce、(outer) join、co-group、iterate etc. . All data structures and algorithms are supported , To operate on the serialized data in memory . If the data size exceeds the reserved memory , The excess data will be stored on disk .Flink Of DataSet API The data processing algorithm of this paper is based on the traditional database algorithm , For example, hybrid hash join (hybrid hash-join) Merge and sort with the outside (external merge-sort).
    • Gelly: Gelly Is an extensible graphics processing and analysis library .Gelly Is in DataSet API On top of that , And with DataSet API Integrate . therefore , It can benefit from its extensible and robust operators .Gelly Provides built-in algorithms , Such as label propagation、triangle enumeration and page rank Algorithm , Also provides a simplified user-defined graph algorithm to achieve Graph API.

One 、DataStream Programming model

DataStream There are four parts in this programming model :Environment,DataSource,Transformation,Sink.
 Insert picture description here

Two 、Flink Of DataSource data source

2.1、 Based on the file , Here is HDFS

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSource {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation , The suggestion is written here , Can prevent IDEA Code prompts for errors 
        import org.apache.flink.streaming.api.scala._


        // Reading data 
        val stream = streamEnv.readTextFile("hdfs://10.0.0.201:9000/README.txt")
        // Conversion calculation 
        val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)
        // Print the results to the console 
        result.print()
        // Start streaming , Without this line of code, the program above will not run 
        streamEnv.execute("wordcount")
    }
}

2.2、 Collection based sources

It's kind of like Spark Serialization

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object CollectionSource {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation , The suggestion is written here , Can prevent IDEA Code prompts for errors 
        import org.apache.flink.streaming.api.scala._


        // Reading data 
        var dataStream = streamEnv.fromCollection(Array(
            new StationLog("001", "186", "189", "busy", 1577071519462L, 0),
            new StationLog("002", "186", "188", "busy", 1577071520462L, 0),
            new StationLog("003", "183", "188", "busy", 1577071521462L, 0),
            new StationLog("004", "186", "188", "success", 1577071522462L, 32)
        ))
        dataStream.print()
        streamEnv.execute()
    }
}

/*
    *  Communication base station log data 
    * @param sid  The base station ID
    * @param callOut  Calling number 
    * @param callIn  Called number 
    * @param callType  Call type eg: The call failed (fail), busy (busy), Reject (barring), Connect (success)
    * @param callTime  Call timestamp , Accurate to milliseconds 
    * @Param duration  Talk time   Company : second 
*/
class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

2.3、Kafka

The first First Need to be want with Set up Kafka even Pick up device Of In accordance with the lai , another Outside more many Of even Pick up device can With check see Officer, network

2.3.1、 Introduce dependencies

        <!-- Kafka connector-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.10.1</version>
            <exclusions>
                <exclusion>
                    <!--  Exclude right Jackson  References to  ; -->
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

2.3.2、Kafka The first one is Source

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByString {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation 
        import org.apache.flink.streaming.api.scala._

        // kafka To configure 
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        props.setProperty("group.id", "chb01")
        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("auto.offset.reset", "latest")

        // Set up kafka For data sources 
        val flinkKafkaConSumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props)
        val stream = streamEnv.addSource(flinkKafkaConSumer)
        stream.print()
        streamEnv.execute()
    }
}

2.3.3、Kafka The second kind Source

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByKeyValue {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation 
        import org.apache.flink.streaming.api.scala._

        val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        props.setProperty("group.id", "fink02")
        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("auto.offset.reset", "latest")
        // Set up kafka For data sources 
        val stream = streamEnv.addSource(new
                FlinkKafkaConsumer[(String, String)]("test", new KafkaDeserializationSchema[(String, String)] {

                    // Whether the flow ends 
                    override def isEndOfStream(t: (String, String)) = false

                    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) = {
                        if (consumerRecord != null) {
                            var key = "null"
                            var value = "null"
                            if (consumerRecord.key() != null)
                                key = new String(consumerRecord.key(), "UTF-8")
                            if (consumerRecord.value() != null)
                                value = new String(consumerRecord.value(), "UTF-8")
                            (key, value)
                        } else { // If kafka If the data in is empty, a fixed binary is returned 
                            ("null", "null")
                        }
                    }

                    // Set the return type to binary 
                    override def getProducedType =
                        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[
                            String])
                }
                    , props).setStartFromEarliest())
        stream.print()
        streamEnv.execute()
    }
}

2.3.3.1、Kafka Production testing

package com.chb.flink.source

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

object MyKafkaProducer {
    def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        //  Note that here is serialization 
        props.setProperty("key.serializer", classOf[StringSerializer].getName)
        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        val producer = new KafkaProducer[String, String](props)

        val random = new Random()
        while(true) {
            producer.send(new ProducerRecord[String, String]("test", "key" + random.nextInt(), "value" + random.nextInt()))
            Thread.sleep(1000)
        }

    }

}

2.4、 Customize Source

Custom data sources , There are two ways to do this :
 By implementing SourceFunction Parallelism comes from the definition of no interface ( In other words, the parallelism can only be 1) Of Source.
 By implementing ParallelSourceFunction Interface or inheritance RichParallelSourceFunction From data sources that define parallelism .

2.4.1、 Realization SourceFunction The custom of Source

package com.chb.flink.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.Random

/**
 *  Of course, you can also customize the data source , There are two ways to do this :
 *   By implementing  SourceFunction  Parallelism comes from the definition of no interface ( In other words, the parallelism can only be  1) Of  Source.
 *   By implementing  ParallelSourceFunction  Interface or inheritance  RichParallelSourceFunction  come from 
 *  Define a data source with parallelism .
 * *
 *  Write an implementation SourceFunction Interface 
 */
class MyCustomerSource extends SourceFunction[StationLog] {
    // Whether to terminate the data stream 
    var flag = true;

    /**
     *  The main method 
     *  Start a Source
     *  In most cases , All need to be in this run Method to implement a loop , So you can cycle through the data 
     *
     * @param sourceContext * @throws Exception
     */
    override def run(sourceContext: SourceFunction.SourceContext[StationLog]):
    Unit = {
        val random = new Random()
        var types = Array("fail", "busy", "barring", "success")
        while (flag) { // If the flow does not terminate , Continue to get data 
            1.to(5).map(i => {
                var callOut = "1860000%04d".format(random.nextInt(10000))
                var callIn = "1890000%04d".format(random.nextInt(10000))
                new StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4)), System.currentTimeMillis(), 0)
            }).foreach(sourceContext.collect(_)) // Sending data 
            Thread.sleep(2000) // Sleep every time data is sent 2 second 
        }
    }

    // Terminate the data stream 
    override def cancel(): Unit = flag = false
}

object CustomerSource {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        import org.apache.flink.streaming.api.scala._

        val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)
        stream.print()
        env.execute()
    }
}

3、 ... and 、 Flink Of Sink Data target

Flink in the light of DataStream Provides a large number of data goals that have been achieved (Sink), Including documents 、Kafka、Redis、HDFS、Elasticsearch wait .

3.1、HDFS Sink

3.1.1、 Configuration support Hadoop FileSystem Connector dependency for

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>1.10.1</version>
</dependency>

3.1.2、Streaming File Sink

https://ci.apache.org/project...

  Streaming File Sink Can write data into HDFS in , It can also support bucket writing , every last Points barrels As the corresponding HDFS A directory in . The default in accordance with the Hours to divide barrels , Inside a barrel , The output is further based on the rolling strategy Cut into smaller The file of . This helps prevent the bucket file from becoming too large . Scrolling strategies can also be configured , The default policy scrolls files based on file size and timeout , The timeout is when no new data is written to some files (part file) Time for .
 Insert picture description here

3.1.2.1、 Rolling strategy

  • DefaultRollingPolicy
  • CheckpointRollingPolicy

    3.1.2.2、 Bucket strategy

    • DateTimeBucketAssigner : Default time based assigner
    • BasePathBucketAssigner : Assigner that stores all part files in the base path (single global bucket)

Note that it has to be turned on checkpoint, Otherwise, all the generated files are inprocess state

3.1.2.3、 Code implementation

package com.chb.flink.sink

import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object HDFSFileSink {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation 
        import org.apache.flink.streaming.api.scala._
        
        //  start-up checkPoint,  otherwise , All the generated files are inprocess State of 
        streamEnv.enableCheckpointing(1000)

        //  data source 
        val data: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)

        // Create a file scrolling rule 
        val rolling: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.create()
            .withInactivityInterval(2000) // The interval between inactivity .
            .withRolloverInterval(2000) // Generate a file every two seconds  , important 
            .build()

        // Create a HDFS Sink
        var hdfsSink = StreamingFileSink.forRowFormat[StationLog](
            //  Notice here is flink Of Path
            new Path("hdfs://ShServer:9000/sink001/"), new SimpleStringEncoder[StationLog]("UTF-8"))
            .withBucketCheckInterval(1000) // Check the interval time of the dividing barrel 
            //            .withBucketAssigner(new MemberBucketAssigner)
            .withRollingPolicy(rolling)
            .build()

        //  add to sink
        data.addSink(hdfsSink)


        streamEnv.execute()
    }

    import org.apache.flink.core.io.SimpleVersionedSerializer
    import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer

    /**
     *  Custom bucket policy 
     */
    class MemberBucketAssigner extends BucketAssigner[StationLog, String] {
        //  Specify the bucket name  yyyy-mm-dd
        override def getBucketId(info: StationLog, context: BucketAssigner.Context): String = {
            val date = new Date(info.callTime)
            new SimpleDateFormat("yyyy-MM-dd/HH").format(date)
        }

        override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
    }

}

3.2、 be based on Redis Of Sink

Flink In addition to the built-in connector , There are also some additional connectors through Apache Bahir Release , Include :
 Apache ActiveMQ (source/sink)
 Apache Flume (sink)
 Redis (sink)
 Akka (sink)
 Netty (source)

3.2.1、 rely on

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

3.2.2、 Write the result redis

package com.chb.flink.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSink {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation , The suggestion is written here , Can prevent IDEA Code prompts for errors 
        import org.apache.flink.streaming.api.scala._
        // Reading data 
        val stream = streamEnv.socketTextStream("hadoop01", 8888)
        // Conversion calculation 
        val result = stream.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)

        // Connect redis Configuration of 
        val config = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("hadoop01").setPort(6379).build()
        // write in redis
        result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
            override def getCommandDescription = new
                    RedisCommandDescription(RedisCommand.HSET, "t_wc")

            override def getKeyFromData(data: (String, Int)) = {
                data._1 // word 
            }

            override def getValueFromData(data: (String, Int)) = {
                data._2 + "" // The number of times a word appears 
            }
        }))
        streamEnv.execute()
    }
}

3.3、Kafka Sink

3.3.1、 The first one is

package com.chb.flink.sink

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

/**
 * Kafka Sink
 */
object KafkaSinkByString {
    def main(args: Array[String]): Unit = {
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1) // By default, the parallelism of each task is 1
        import org.apache.flink.streaming.api.scala._

        // Read netcat Data in the stream  ( Real time flow )
        val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        // Conversion calculation 
        val result = stream1.flatMap(_.split(","))

        // Data writing Kafka, And is KeyValue Formatted data 
        result.addSink(new FlinkKafkaProducer[String]("hadoop01:9092", "t_topic", new SimpleStringSchema()))
        streamEnv.execute()
    }
}

3.3.2、 The second kind

package com.chb.flink.sink

import java.lang
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

/**
 * Kafka Sink
 */
object KafkaSinkByKeyValue {
    def main(args: Array[String]): Unit = {
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1) // By default, the parallelism of each task is 1
        import org.apache.flink.streaming.api.scala._

        // Read netcat Data in the stream  ( Real time flow )
        val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        // Conversion calculation 
        val result = stream1.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)

        //Kafka The configuration of the producer 
        val props = new Properties()
        props.setProperty("bootstrap.servers", "hadoop01:9092")
        props.setProperty("key.serializer", classOf[StringSerializer].getName)
        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        // Data writing Kafka, And is KeyValue Formatted data 
        result.addSink(new FlinkKafkaProducer[(String, Int)]("t_topic",
            new KafkaSerializationSchema[(String, Int)] {
                override def serialize(element: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
                    new ProducerRecord("t_topic", element._1.getBytes, (element._2 + "").getBytes())
                }
            }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) //EXACTLY_ONCE  Exactly once 
        streamEnv.execute()
    }
}

3.4、 Customize Sink

package com.chb.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/*
 * From custom Source Read from StationLog data , adopt Flink write in Mysql database 
 *
 *  Of course, you can define it yourself  Sink, There are two ways to do it :
 * 1、 Realization  SinkFunction  Interface .
 * 2、 Realization RichSinkFunction  class . The latter adds lifecycle management capabilities .
 *  For example, we need to  Sink  Create a connection object during initialization , It's better to use the second one .
 *  Case needs : hold  StationLog  Object write  Mysql  In the database .
 */
object CustomJdbcSink {

    // Customize a Sink write in Mysql
    class MyCustomSink extends RichSinkFunction[StationLog] {
        var conn: Connection = _
        var pst: PreparedStatement = _

        // Life cycle management , stay Sink Called at initialization time 
        override def open(parameters: Configuration): Unit = {
            conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123")
            pst = conn.prepareStatement("insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) values(?, ?, ?, ?, ?, ?)")
        }

        // hold StationLog  Written to the table t_station_log
        override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {
            pst.setString(1, value.sid)
            pst.setString(2, value.callOut)
            pst.setString(3, value.callIn)
            pst.setString(4, value.callType)
            pst.setLong(5, value.callTime)
            pst.setLong(6, value.duration)
            pst.executeUpdate()
        }

        override def close(): Unit = {
            pst.close()
            conn.close()
        }
    }

    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)
        // Import implicit transformation , The suggestion is written here , Can prevent IDEA Code prompts for errors 
        import org.apache.flink.streaming.api.scala._
        val data: DataStream[StationLog] = streamEnv.addSource(new
                MyCustomerSource)
        // Data writing msyql
        data.addSink(new MyCustomSink)
        streamEnv.execute()
    }
}

Four 、DataStream Conversion operator

This is very simple , see api You know

5、 ... and 、 Function classes and rich function classes

Almost all the operators in the previous section can be customized Function class 、 Rich function classes As a parameter . because Flink Exposed the interface of these two function classes , Common function interfaces are :

  • MapFunction
  • FlatMapFunction
  • ReduceFunction
  • .....

Rich function Interfaces differ from other normal function interfaces in that : You can get the context of the running environment , State can be managed in context (State), And have some life cycle approaches , So more complex functions can be implemented . Rich function interfaces have :

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • .....

    5.1、 Common function class example : Output the dialing time and end time of each call according to the specified time format

    5.2、 Examples of rich function classes : Convert the successful call information into the real user name

    User table corresponding to the calling user ( stay Mysql In the data ) by :
     Insert picture description here
    Because of the need to query data from the database , You need to create a connection , The code to create the connection must be written in the lifecycle of open In the method . So you need to use rich function classes .
    Rich Function There is a concept of life cycle . A typical life cycle approach Yes :

    • open() The method is rich function Initialization method of , When an operator, for example map perhaps filter Before being called open() Will be called .
    • close() Method is the last method called in the life cycle , Do some cleaning .
  • getRuntimeContext() Method provides the function with RuntimeContext Some information , For example, the parallelism of function execution , The name of the mission , as well as state state
package com.chb.flink.func

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat

import com.chb.flink.source.StationLog
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 *  Examples of rich function classes : Convert the successful call information into the real user name 
 */
object TestFunction {
    def main(args: Array[String]): Unit = {
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        //  Implicit conversion 
        import org.apache.flink.streaming.api.scala._

        val data: DataStream[StationLog] = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
            .map(line => {
                val arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })

        // Define the time output format 
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        // Filter the successful calls 
        data.filter(_.callType.equals("success"))
            .map(new CallMapFunction(format))
            .print()
        streamEnv.execute()
    }
}


// Custom rich function class 
class CallRichMapFunction() extends RichMapFunction[StationLog, StationLog] {
    var conn: Connection = _
    var pst: PreparedStatement
    = _

    // Life cycle management , Create a data connection during initialization 
    override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456")
        pst = conn.prepareStatement("select name from t_phone where phone_number =?")
    }

    override def map(in: StationLog): StationLog = {
        // Query the name of the calling user 
        pst.setString(1, in.callOut)
        val set1: ResultSet = pst.executeQuery()
        if (set1.next()) {
            in.callOut = set1.getString(1)
        }
        // Query the name of the called user 
        pst.setString(1, in.callIn)
        val set2: ResultSet = pst.executeQuery()
        if (set2.next()) {
            in.callIn = set2.getString(1)
        }
        in
    }

    // Close the connection 
    override def close(): Unit = {
        pst.close()
        conn.close()
    }

}

6、 ... and 、 Bottom ProcessFunctionAPI

ProcessFunction It's a Low level stream processing operations , Allow to return all Stream The basic building blocks :

  • visit Event Self data ( such as :Event Time for ,Event The current Key etc. )
  • Management status State( Only in Keyed Stream in )

    • Managing timers Timer( Include : Register timer , Delete timers, etc )

To make a long story short ,ProcessFunction yes Flink At the bottom of the API, It's also the most powerful .

for example : Monitor every cell phone , If in 5 Calls to it in seconds have failed , Send a warning message .

package com.chb.flink.func

import java.text.SimpleDateFormat
import java.util.Date

import com.chb.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
 *  Monitor every cell phone number , If in 5 Calls to it in seconds have failed , Send a warning message 
 *  stay 5 One call in a second is not fail No warning 
 */
object TestProcessFunction {
    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // Import implicit transformation 
        import org.apache.flink.streaming.api.scala._

        // Read socket data 
        val data = streamEnv.socketTextStream("10.0.0.201", 8888)
            .map(line => {
                var arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })
        // Processing data 
        data.keyBy(_.callOut)
            .process(new MonitorCallFail())
            .print()


        streamEnv.execute()
    }

    class MonitorCallFail() extends KeyedProcessFunction[String, StationLog, String] {
        //  Define a state record time 
        lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", classOf[Long]))

        //  Processing data 
        override def processElement(value: StationLog,
                                    context: KeyedProcessFunction[String, StationLog, String]#Context,
                                    collector: Collector[String]): Unit = {
            val time = timeState.value() //  Take the time out of the State 
            if (value.callType.equals("fail") && time == 0) { //  The first failure 
                //  Get the current time ,  Register timer 
                val now = context.timerService().currentProcessingTime()
                var onTime = now + 5000L // 5 Seconds later 
                context.timerService().registerProcessingTimeTimer(onTime);
                println("first time: " + new Date())
                timeState.update(onTime)
            }

            //  There was a successful call ,  Cancel trigger 
            if (!value.callType.equals("fail") && time != 0) {
                context.timerService().deleteProcessingTimeTimer(time)
                timeState.clear()
            }


        }

        //  Time out ,  Execute trigger , Give an alarm 
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext,
                             out: Collector[String]): Unit = {
            val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            var warnStr = " Trigger time :" + df.format(new Date(timestamp)) + "  cell-phone number :" + ctx.getCurrentKey
            out.collect(warnStr)
            timeState.clear()
        }

    }


}

7、 ... and 、 Side output stream Side Output

   stay flink When dealing with data streams , We often encounter such situations : When dealing with a data source , Often You need to segment different types of data in the source

  • If you use filter The operator is used to filter and segment the data source , It is bound to cause multiple copies of the data stream , Cause unnecessary performance waste ;
  • Side output is to divide the data stream , And a shunt mechanism that does not replicate convection .
  • flink Another function of the side output is to process the delayed data , So you don't have to discard late data .

Case study : According to the base station log , Please make the call successful Stream( Main stream ) And unsuccessful Stream( Sidestream ) Output, respectively, .

package com.chb.flink.func

import com.chb.flink.source.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

/**
 *  Call the successful Stream( Main stream ) And unsuccessful Stream( Sidestream ) Output, respectively, .
 */
object TestSideOutputStream {


    // The side output stream first needs to define a label for the stream  ,  Here we need to put the implicit transformation ahead 
    import org.apache.flink.streaming.api.scala._
    var notSuccessTag = new OutputTag[StationLog]("not_success")

    def main(args: Array[String]): Unit = {
        // initialization Flink Of Streaming( Flow calculation ) Context execution environment 

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)


        // Read file data 
        val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
            .map(line => {
                var arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })
        val mainStream: DataStream[StationLog] = data.process(new CreateSideOutputStream(notSuccessTag))

        // Get the sidestream 
        val sideOutput: DataStream[StationLog] = mainStream.getSideOutput(notSuccessTag)
        mainStream.print("main")
        sideOutput.print("sideoutput")
        streamEnv.execute()
    }

    class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
        override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {
            if (value.callType.equals("success")) { // Output mainstream 
                out.collect(value)
            } else { // Output side stream 
                ctx.output(tag, value)
            }
        }
    }

}

There's also video explanation in my B standing - Brother Bao chbxw, I hope you can support me , thank you .


Flink Directory guide

Follow my public number 【 Baoge big data 】, More dry .

 Insert picture description here

版权声明
本文为[Baoge]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/06/20210623214918502b.html