当前位置:网站首页>New features of Flink | CDC (change data capture) principle and practical application

New features of Flink | CDC (change data capture) principle and practical application

2020-12-08 12:29:16 Wang Zhiwu

Big data technology and architecture

Click on the right to follow , The strongest official account in the field of big data development !

big data It's fun

Click on the right to follow , Big data is fun !

CDC brief introduction

CDC,Change Data Capture, Short for change data acquisition , Use CDC We can get these changes from the database and send them to , For downstream use . These changes can include INSERT,DELETE,UPDATE etc. .

Users can use it in the following scenarios CDC:

  • Use flink sql Data synchronization , You can synchronize data from one data to another , such as mysql、elasticsearch etc. .
  • You can materialize an aggregate view on the source database in real time
  • Because it's just incremental synchronization , So you can synchronize data in real time with low latency
  • Use EventTime join One temporal Table so that accurate results can be obtained

Flink 1.11 Will these changelog Extract and convert into Table API and SQL, Two formats are currently supported :Debezium and Canal, This means that the source table is not just append operation , And there's more upsert、delete operation .

Flink CDC Some scenarios where the function is applicable :

  • Incremental data synchronization between databases
  • The audit log
  • Real time materialized views on the database
  • be based on CDC Dimension table of join

Flink CDC Usage mode

at present Flink Supports two built-in connector,PostgreSQL and mysql, So let's start with mysql For example .

Flink 1.11 Support only Kafka As a ready-made change log source and JSON Code change log , and Avro(Debezium) and Protobuf(Canal) Plans to use . And plans to support MySQL Binary log and Kafka Compress theme as source , And extend the extended log support to batch execution .

Flink CDC Act as a listener to get incremental changes

How to realize the synchronization of service data in traditional real-time link , We use canal For example , Traditional business data real-time synchronization involves canal Handle mysql Of binlog And then synchronize to kafka, Through the computing engine spark,flink or storm Computational transformation , Then the result data is transferred to the third party storage (hbase,es) As shown in the figure below, it is mainly divided into three modules E(Extract) ,T(Transform), L(Load). You can see that there are many components involved , The link is long .

We can directly Flink CDC Incremental log of consumption database , Instead of the original data acquisition layer canal, And then we calculate it directly , After calculation , Calculate the result Send it downstream . The overall structure is as follows :

There are advantages to using this architecture :

  • Reduce canal and kafka The maintenance cost of , Shorter links , Less delay
  • flink Provides exactly once semantics
  • You can specify from position Read
  • Removed kafka, It reduces the storage cost of messages

We need to introduce the corresponding pom,mysql Of pom as follows :

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.1.0</version>
</dependency>

If it is sql Client side usage , Need to download flink-sql-connector-mysql-cdc-1.1.0.jar And put it in <FLINK_HOME>/lib/ below

Connect mysql An example of a database sql as follows :

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

Use API The way :

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
 
public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute();
  }
}

Flink CDC As a conversion tool

if necessary Flink The role is the computing layer , So far Flink Provided format There are two formats :canal-json and debezium-json, Let's briefly introduce .

If you want to use Kafka Of canal-json, For programs , The following dependencies need to be added :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

We can consume directly canal-json data :

CREATE TABLE topic_products (
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json'  -- using canal-json as the format
)

changelog format

If you want to use Kafka Of changelog-json Format, For programs , The following dependencies need to be added :

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

If you want to use Flink SQL Client, You need to add the following jar package :flink-format-changelog-json-1.0.0.jar, Will be jar bag Flink Installation directory lib Under folder .

-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
    day_str STRING,
    uv BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'day_uv',
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format' = 'changelog-json'  -- the data format is json
);

-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');

-- reading the changelog back again
SELECT * FROM day_uv;

Copyright notice :

This paper is about big data technology and architecture , Exclusive authorization by the original author . Reprint without the permission of the original author shall be investigated for tort liability .

edit | Cold eye

WeChat official account |import_bigdata

Welcome to thumb up + Collection + Forward the quality of the circle of friends

This article is from WeChat official account. - Big data technology and architecture (import_bigdata)

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2020-11-23

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[Wang Zhiwu]所创,转载请带上原文链接,感谢
https://chowdera.com/2020/12/20201208122832825z.html