当前位置:网站首页>Flink's datasource Trilogy: direct API

Flink's datasource Trilogy: direct API

2020-11-06 20:59:03 Programmer Xinchen

Welcome to visit mine GitHub

https://github.com/zq2599/blog_demos

Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;

This article is about 《Flink Of DataSource Trilogy 》 The first in the series , This series aims to learn and understand through actual combat Flink Of DataSource, Lay a good foundation for further study , It consists of three parts :

  1. direct API: This is the article. , In addition to preparing the environment and Engineering , And learned StreamExecutionEnvironment Provided to create data API;
  2. built-in connector:StreamExecutionEnvironment Of addSource Method , The reference can be flink Built in connector, for example kafka、RabbitMQ etc. ;
  3. Customize :StreamExecutionEnvironment Of addSource Method , The input parameters can be customized SourceFunction Implementation class ;

Flink Of DataSource Trilogy article links

  1. 《Flink Of DataSource One of the trilogy : direct API》
  2. 《Flink Of DataSource Trilogy two : built-in connector》
  3. 《Flink Of DataSource Trilogy three : Customize 》

About Flink Of DataSource

The official response to DataSource The explanation of :Sources are where your program reads its input from, namely DataSource It's the data source of the application , As shown in the two red boxes below :  Insert picture description here

DataSource type

For common text reading 、kafka、RabbitMQ Etc , You can use it directly Flink Provided API perhaps connector, If these can't meet the needs , You can also develop your own , The picture below is my own understanding :  Insert picture description here

Environment and version

Master built-in DataSource The best way is to fight , The actual combat environment and version are as follows :

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

Source download

If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):

name link remarks
Project home page https://github.com/zq2599/blog_demos The project is in progress. GitHub Home page on
git Warehouse address (https) https://github.com/zq2599/blog_demos.git The warehouse address of the source code of the project ,https agreement
git Warehouse address (ssh) git@github.com:zq2599/blog_demos.git The warehouse address of the source code of the project ,ssh agreement

This git Multiple folders in project , The application of this chapter in <font color="blue">flinkdatasourcedemo</font> Under the folder , As shown in the red box below :  Insert picture description here

Environment and version

The actual combat environment and version are as follows :

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

Create a project

  1. Execute the following command on the console to enter creation flink Interaction mode of application , Press the prompt to enter gourpId and artifactId, It creates a flink application ( I input groupId yes <font color="blue">com.bolingcavalry</font>,artifactId yes <font color="blue">flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
  1. Now? maven The project has generated , use IDEA Import this project , Here's the picture :  Insert picture description here
  2. With maven Type import :  Insert picture description here
  3. Import successful look :  Insert picture description here
  4. Project created successfully , You can start to write code ;

Auxiliary class Splitter

There is a function commonly used to : Split the string with spaces , Turn into Tuple2 A collection of types , Let's make this operator a common class Splitter.java, The code is as follows :

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

Ready , It's time to start fighting , Start with the simplest Socket Start .

Socket DataSource

Socket DataSource The function of is to monitor the specified IP Specified port for , Read network data ;

  1. Create a class in the new project Socket.java:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Monitor local 9999 port , Read string 
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        // Every five seconds , Divide all strings in the current five seconds into spaces , Then count the number of words , Print out 
        socketDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("API DataSource demo : socket");
    }
}

From the above code, we can see ,StreamExecutionEnvironment.socketTextStream I can create Socket Type of DataSource, Execute the command at the console <font color="blue">nc -lk 9999</font>, Enter interactive mode , At this time, output any string and enter , Will transfer the string to the native 9999 port ;

  1. stay IDEA Up operation Socket class , After successful startup, go back to the execution <font color="blue">nc -lk 9999</font> In the console , Enter some strings and enter , so Socket The function of is already in effect :

 Insert picture description here

aggregate DataSource(generateSequence)

  1. Set based DataSource,API As shown in the figure below :

 Insert picture description here 2. First try the simplest generateSequence, Create a numeric... In the specified range DataSource:

package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // The degree of parallelism is 1
        env.setParallelism(1);

        // adopt generateSequence obtain Long Type of DataSource
        DataStream<Long> dataStream = env.generateSequence(1, 10);

        // Do a filter , Keep only even numbers , And then print 
        dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return 0L==aLong.longValue()%2L;
            }
        }).print();

        env.execute("API DataSource demo : collection");
    }
}
  1. Even numbers will be printed at run time :

4.

aggregate DataSource(fromElements+fromCollection)

  1. fromElements and fromCollection Try it in one class , establish <font color="blue">FromCollection</font> class , Inside are these two API Usage of :
package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class FromCollection {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // The degree of parallelism is 1
        env.setParallelism(1);

        // Create a List, There are two in it Tuple2 Elements 
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 1));

        // adopt List establish DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        // Through multiple Tuple2 Element creation DataStream
        DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
                new Tuple2("ccc", 1),
                new Tuple2("ddd", 1),
                new Tuple2("aaa", 1)
        );

        // adopt union Put two DataStream Synthesis of a 
        DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

        // Count the number of words 
        unionDataStream
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : collection");
    }
}
  1. The operation results are as follows :  Insert picture description here

file DataSource

  1. Below ReadTextFile Class will read the absolute path text file , And make word statistics for the content :
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set the parallelism to 1
        env.setParallelism(1);

        // use txt File as data source 
        DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");

        // Count the number of words and print them out 
        textDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : readTextFile");
    }
}
  1. Make sure that the absolute path in your code exists under the name README.txt file , The operation results are as follows :

 Insert picture description here 3. open StreamExecutionEnvironment.java Source code , Take a look at the readTextFile The method is as follows , It turns out that another method with the same name was called , The third parameter of this method determines that the text file is read once , Or periodic scanning of content changes , The fourth parameter is the interval between periodic scans :

public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
		Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");

		TextInputFormat format = new TextInputFormat(new Path(filePath));
		format.setFilesFilter(FilePathFilter.createDefaultFilter());
		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
		format.setCharsetName(charsetName);

		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
	}
  1. above FileProcessingMode It's an enumeration. , Source code is as follows :
@PublicEvolving
public enum FileProcessingMode {

	/** Processes the current contents of the path and exits. */
	PROCESS_ONCE,

	/** Periodically scans the path for new data. */
	PROCESS_CONTINUOUSLY
}
  1. Please also pay attention to <font color="blue">readTextFile</font> Methodical <font color="red">filePath</font> Parameters , This is a URI String of type , Except for the local file path , It can also be HDFS The address of :<font color="blue">hdfs://host:port/file/path</font>

thus , By direct API establish DataSource The actual battle of is finished , We will continue to learn about built-in connector The way of DataSource;

Welcome to the official account : Xinchen, programmer

WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos

版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢