当前位置:网站首页>How to deal with the out of order data in real-time computing scene by Apache Flink

How to deal with the out of order data in real-time computing scene by Apache Flink

2020-12-06 11:57:24 Programmer Kay

One 、 The future of streaming Computing

Published on Google GFS、BigTable、Google MapReduce After three papers , Big data technology has really made its first leap ,Hadoop The ecosystem is gradually developing .

Hadoop Very good at handling large amounts of data , It mainly has the following characteristics :

1、 Before the calculation begins , Data must be prepared in advance , Then you can start to calculate ;

2、 When a lot of data is calculated , Will output the final calculation result , Complete the calculation ;

3、 The timeliness is relatively low , Not for real-time computing ;

And with real-time recommendations 、 Risk control and other business development , The requirement of data processing delay is higher and higher , The real-time requirement is also higher and higher ,Flink Starting to show up in the community .

Apache Flink As a real stream processing framework , It has low latency , It can ensure that the message transmission is not lost and repeated , With very high throughput , Support native stream processing .

This paper mainly introduces Flink The concept of time 、 Window computing and Flink How to deal with out of order data in the window .

stay Flink There are three main concepts of time :

(1) Time of occurrence , be called Event Time;

(2) Data access to Flink Time for , be called Ingestion Time;

(3) The data is in Flink The system time of the machine when the system is operated , be called Processing Time

Processing time is a relatively simple concept of time , There is no need for coordination between flows and systems , Can provide the best performance and the lowest latency . But in a distributed environment , The processing time of multiple machines cannot be strictly consistent , There is no guarantee of certainty .

And event time is the time when an event occurs , Entering Flink System time , Already in record Record in , This can be done by extracting event timestamps , Make sure that during processing , Reflect the sequence of events .

file

file

We know that streaming datasets have no boundaries , Data will continue to be sent to our system .

The ultimate goal of stream computing is to generate aggregate results from statistical data , And on unbounded data sets , If you do a global window statistics , It's unrealistic .

Only to delimit a certain size window range to do the calculation , In order to finally aggregate to the downstream system , To analyze and demonstrate .
file

stay Flink When you do window Computing , You need to know two core messages :

  • Every Element Of EventTime Time stamp ?( It can be specified in the data record )
  • Access data , When can statistical calculations be triggered ? ( window 11:00 ~ 11:10 All the data has been received )

Orderly Events

Suppose under perfect conditions , The data is strictly ordered , So at this time , Stream computing engine can calculate the data of each window correctly

file

Disordered events

But in reality , Data can be for a variety of reasons ( System delay , Network delay, etc ) It's not a strictly ordered arrival system , Even some data will be long overdue , here Flink There needs to be a mechanism , Allow data to be out of order within a certain range .

This mechanism is called watermark .

file

Like above , There is a parameter : MaxOutOfOrderness = 4, Is the maximum out of order time , It means how much data can be allowed to be out of order , It can be 4 minute ,4 Hours etc. .

The watermark generation strategy is , The maximum event timestamp of the current window minus MaxOutOfOrderness Value .

Pictured above , event 7 Will produce a w(3) The watermark , event 11 Will produce to give w(7) The watermark , But the event 9 , It's less than the event 11 Of , The watermark update will not be triggered at this time . event 15 Will produce a w(11) The watermark . in other words , The watermark reflects the overall flow of events , It just goes up , It won't fall .

The watermark indicates that all events less than the watermark value have reached the window .

Whenever a new maximum timestamp appears , You're going to get a new one watermark

Late events

For events whose event time is less than watermark time , It's called a late event . Late events are not included in the window statistics .

Here's the picture ,21 After the event entered the system , Will produce w(17) The watermark . And later 16 event , Because it is less than the current watermark time w(17), It's not going to be counted .

file

When to trigger computation

Let's use a graph to show when the window's computation will trigger

Here's the picture , It means a 11:50 To 12:00 The window of , Here's a piece of data , cat,11:55, The time of the event is 11:55, In the window , The maximum delay time is 5 minute , So the current watermark time is 11:50

file

Here comes another piece of data ,dog,11:59, The time of the event is 11:59, Into the window .

Because the time of this event is longer than that of the last one , So the watermark is updated to 11:54. At this time, because the watermark time is still less than the window end time , So there's still no trigger calculation .

file

Here's another piece of data , cow,12:06, At this time, the watermark time is updated to 12:01 , It's past the window end time , The window calculation is triggered ( Suppose the calculation logic is to count the number of different elements in the window ).

file

Suppose there's another event , yes dog,11:58, Because it's less than the watermark time , And after the last trigger window calculation , The window has been destroyed , therefore , This event will not trigger the calculation .

here , You can put this event in sideoutput In line , Additional logic processing .

file

So in 1.11 In the version , The watermark generation interface is reconstructed . In the new version , Mainly through WatermarkStrategy class , To use different strategies to generate watermarks .

The new interface provides many static methods and methods with default implementation , If you want to define your own generation strategy , You can do this :

file

Generate a WatermarkGenerator

file

This class is also very simple

  • onEvent: If we want to rely on each element to generate a watermark and send it downstream , You can do this ;
  • OnPeriodicEmit: If there is a large amount of data , If we generate a watermark for each data , Will affect performance , So there's also a way to periodically generate watermarks .

To facilitate development ,Flink Some built-in watermark generation methods are also provided for our use

  • Fixed delay generation watermark
    We want to generate a delay 3 s Fixed watermark for , You can do this
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
  • The watermark is generated monotonically and incrementally
    Equivalent to the above delay strategy to remove the delay time , With event The time stamp in serves as a watermark , It can be used in this way :
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

5、 ... and 、 A simple example , To count the number of letters in the window

public class StreamTest1 {


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class MyLog {
        private String msg;
        private Integer cnt;
        private long timestamp;
    }

    public static class MySourceFunction implements SourceFunction<MyLog> {

        private boolean running = true;

        @Override
        public void run(SourceContext<MyLog> ctx) throws Exception {
            while (true) {
                Thread.sleep(1000);
                ctx.collect(new MyLog(RandomUtil.randomString(1),1,System.currentTimeMillis()));
            }
        }
        @Override
        public void cancel() {
            this.running = false;
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //  Data sources use custom data sources , Every time 1s Send a random message 
        env.addSource(new MySourceFunction())
                //  Specifies that the watermark generation strategy is , Maximum event time minus  5s, Specify the event time field as  timestamp
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
                                <MyLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event,timestamp)->event.timestamp))
                //  Press   News group 
                .keyBy((event)->event.msg)
                //  Define a 10s Time window of 
                .timeWindow(Time.seconds(10))
                //  Count the number of times the message appears 
                .sum("cnt")
                //  Printout 
                .print();

        env.execute("log_window_cnt");
    }
}

版权声明
本文为[Programmer Kay]所创,转载请带上原文链接,感谢
https://chowdera.com/2020/12/20201206115524894p.html