当前位置: 首页 > 工具软件 > s-json > 使用案例 >

Flink SQL 1.12 深度实践 一 Kafka source (json+debezium json+复杂json)

萧展鹏
2023-12-01

 

目录

序言

一. kafka的数据源 

 1.1 json

1.1.1  flink sql ddl 

1.1.2  数据准备

1.1.3  开启sql-client 

1.1.4 代码

1.1.6 配置

 

1.2 debeizum-json

1.2.1  flink sql

1.2.2 准备数据

1.2.3 sql-client

1.2.4 代码 

1.2.5 采坑 

 (3)关于debeizum 的 metadata如何获取?

1.2.6 用debezium-json的 关键 配置

1.3 kafka-more-json

1.3.2 准备数据

1.3.3 sql-client

1.3.5 采坑



 

序言


近期主要是我花时间一步步采坑实践出来的各种细节,发现官网很多文字和配置都误差. 所以本人本着真实可靠的实践操作来给予大家的文案.希望可以帮到你    .            
做实践之前,必须准备
flink 环境 略        
java 环境 略        
sql-client 开启 略         
docker 环境. 以备各个组件的快速运行.        

一. kafka的数据源 


读取kafka的数据包含三种类型, json ,csv,debezium-json,canel-json 点击 

 1.1 json

1.1.1  flink sql ddl 

CREATE TABLE user_test ( 
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka',
'topic' = 'user_test',
'properties.group.id' = 'local-sql-test',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'nlocalhost:9200',
'format' = 'json' ,
'json.timestamp-format.standard' = 'SQL',
'scan.topic-partition-discovery.interval'='10000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

 

1.1.2  数据准备


(1)create topic


./kafka-topics.sh --create  \
--zookeeper  localhost:2181 \
--replication-factor 1 --partitions 1  --topic user_test

(2)producer topic data
 ./kafka-console-producer.sh \
 --broker-list localhost:9200 \
 --topic user_test  

(3) data 
(json 可以调整,key可以缺少,也可以是int类型)这取决于你后面的datatypes的类型
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00.123"}


1.1.3  开启sql-client 


 ./bin/sql-client.sh embedded

(1) 准备sql-client 
mkdir -p  sqlclient  && cd  sqlclient   
flink-json-1.12.0.jar 
flink-sql-connector-kafka_2.11-1.12.0.jar 


参考: 
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.11.1/flink-sql-connector-kafka_2.11-1.12.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.12.0/flink-json-1.12.0.jar

启动可能需要的环境变量参数

create YAML file (sql-env.yaml)
configuration:
  execution.target: yarn-session

bin/sql-client.sh embedded -e sql-env.yaml

或者用其他方式启动

bin/sql-client.sh embedded -s yarn-session


1.1.4 代码

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.12.1</version>
        </dependency>

/**
 * @Auther: laraj
 * @Date: 2021/1/28
 * @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
 */
public class FlinkKafkaSource implements Serializable {

    /**
     *
     ./kafka-console-producer.sh \
     --broker-list localhost:9200 \
     --topic user_test
     {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

     */
    public static final String KAFKA_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE user_test (\n" +
            "    user_id BIGINT,\n" +
            "    item_id BIGINT,\n" +
            "    category_id BIGINT,\n" +
            "    behavior STRING,\n" +
            "    `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',\n" +
            "    ts TIMESTAMP(3) " +
            " \n" +
            ") WITH (\n" +
            "    'connector' = 'kafka', " +
            "    'topic' = 'user_test', " +
            "    'properties.group.id' = 'local-test', " +
            "     'json.fail-on-missing-field' = 'false', " +
            "    'json.ignore-parse-errors' = 'true', " +
            "     'json.timestamp-format.standard'='SQL', " +
            "     'scan.topic-partition-discovery.interval'='10000', " +
            "    'scan.startup.mode' = 'latest-offset',  " +
            "    'properties.bootstrap.servers' = 'localhost:9200', " +
            "    'format' = 'json' " +
            ")  ";


    public static void main(String[] args) {

        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3,
                Time.of(5, TimeUnit.MINUTES),
                Time.of(10, TimeUnit.SECONDS)
        ));
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

        tEnv.executeSql(KAFKA_TABLE_SOURCE_DDL);

        //执行查询
        Table table = tEnv.sqlQuery("select * from user_test ");

        //转回DataStream并输出
        DataStream<Row> rowDataStream = tEnv.toAppendStream(table, Row.class);

        rowDataStream.map(new MapFunction<Row, Row>() {
            @Override
            public Row map(Row row) throws Exception {
                System.out.println(row);
                return row;
            }
        }).print("FlinkKafkaSource==").setParallelism(1);
        //任务启动,这行必不可少!
        try {
            env.execute("FlinkKafkaSource");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}


1.1.5 采坑强调
(1)
Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.  
--> pom中少引用了flink-json 


(2)关于数据中想要用timestamp字段类型


 flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
-->sql定义中出现了不能序列化的datatypes.比如timetamps 在mysql中有,但在flinksql中可能序列化不完整. 如果需要的话,必须要 'json.timestamp-format.standard' = 'SQL'的配置.


(3) 关于json的规范问题


'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
目的是为了json缺失或者数据为空的时候,不会抛异常


(4) 最终执行 sql在代码层的时候,还得去掉';' 

这个点我也没有明白,反正去掉分好就是对的

1.1.6 配置


 注意,这和官网的相违背,所以我推断写官网文档的一定不是写源码的commiter

 

这里做一个简单的重点解释 解释.

 
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id 
property-version 
scan.startup.mode    --'earliest-offset' 'latest-offset' 'group-offsets'  'timestamp' and 'specific-offsets'.
scan.startup.specific-offsets --定向分区开始消费 
scan.startup.timestamp-millis  ---指定开始消费的时间戳
scan.topic-partition-discovery.interval  --发现动态Kafka新加入的partition并设置时间间隔。
sink.parallelism  --指定分区sink 
sink.partitioner   ---指定sink 到kafka的
sink.semantic  -- 'at-least-once' / 'exactly-once'
topic   
topic-pattern   --对于 topic 和topic-partition 只能任选一个 
value.fields-include   --EXCEPT_KEY / ALL 默認ALL ,如果選了 EXCEPT_KEY,可以简单理解成自定义了一下key. 那么还需要添加以下作为补充.   'key.fields-prefix'/ 'key.fields' 

1.2 debeizum-json


1.2.1  flink sql

CREATE TABLE triggers ( 
`trigger_id` BIGINT,
`trigger_source` STRING,
`modify_time` BIGINT ,
`enc_type` TINYINT ,
PRIMARY KEY (`trigger_id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'azkaban.DEV.azkaban.triggers_test',
'properties.bootstrap.servers' = 'localhost:9200',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json',
'scan.topic-partition-discovery.interval'='10000',
'value.debezium-json.ignore-parse-errors'='true',
'value.debezium-json.schema-include'='true'
);

1.2.2 准备数据


  (1) 需要安装debezium /mysql /kafka connect 
详细的介绍了 构建docker-compose 安装着一系列的操作. 细节私我. 
https://github.com/leonardBang/flink-sql-etl/tree/master/flink-demo

  (2) 最后呈现kafka的数据如图所示就算准备好了,这个一个topic name 对于mysql的一张表. 

1.2.3 sql-client

1.2.4 代码 

/**
 * @Auther: laraj
 * @Date: 2021/1/28
 * @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/formats/debezium.html#%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8-debezium-format
 */
public class FlinkKafkaDebeziumSource {
    public static void main(String[] args) {


        String debeziumKafkaJson = "CREATE TABLE triggers ( \n" +
                "  `trigger_id` BIGINT,\n" +
                "  `trigger_source` STRING,\n" +
                "  `modify_time` BIGINT ,\n" +
                "  `enc_type` TINYINT ,\n" +
                "  PRIMARY KEY (`trigger_id`) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'azkaban.DEV.azkaban.triggers_test',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9200',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.format' = 'debezium-json',\n" +
                "  'scan.topic-partition-discovery.interval'='10000',\n" +
                "  'value.debezium-json.ignore-parse-errors'='true',\n" +
                "  'value.debezium-json.timestamp-format.standard'='SQL' ," +
                "  'value.debezium-json.schema-include'='true' " +
                "  ) ";

        String debeziumJsonMetadata = " CREATE TABLE triggers_data ( \n" +
                "  `schema` STRING NULL  METADATA  FROM 'value.schema' VIRTUAL,\n" +
                "  `ingestion_time` TIMESTAMP(3)   METADATA  FROM 'value.ingestion-timestamp' VIRTUAL,\n" +
                "  `source_time`   TIMESTAMP(3)  METADATA  FROM 'value.source.timestamp' VIRTUAL,\n" +
                "  `source_database` STRING  NULL  METADATA  FROM 'value.source.database'  VIRTUAL,\n" +
                "  `scource_schema`  STRING NULL   METADATA  FROM 'value.source.schema' VIRTUAL,\n" +
                "  `souce_table` STRING NULL    METADATA  FROM 'value.source.table' VIRTUAL,\n" +
                "  `souce_properties` MAP<STRING, STRING> NULL    METADATA  FROM 'value.source.properties' VIRTUAL,\n" +
                "  `trigger_id` BIGINT,\n" +
                "  `trigger_source` STRING,\n" +
                "  `modify_time` BIGINT ,\n" +
                "  `enc_type` TINYINT ,\n" +
                "  PRIMARY KEY (`trigger_id`) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "   'connector' = 'kafka',\n" +
                "   'topic' = 'azkaban.DEV.azkaban.triggers_test',\n" +
                "   'properties.bootstrap.servers' = 'localhost:9200',\n" +
                "   'scan.startup.mode' = 'latest-offset',\n" +
                "   'value.format' = 'debezium-json',\n" +
                "  'scan.topic-partition-discovery.interval'='10000',\n" +
                "   'value.debezium-json.ignore-parse-errors'='true',\n" +
                "   'value.debezium-json.timestamp-format.standard'='SQL',\n" +
                "   'value.debezium-json.schema-include'='true' " +
                "   ) ";

        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

        tEnv.executeSql(debeziumKafkaJson);
        TableResult tableResult = tEnv.executeSql(debeziumJsonMetadata);
//        tableResult.print("--");


        //执行查询 原mysql表的字段.
        Table table = tEnv.sqlQuery("select * from triggers ");
        //转回DataStream并输出
        DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(table, Row.class);
        dataStream.map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
            @Override
            public Row map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
//                System.out.println(booleanRowTuple2.f0 + "--" + booleanRowTuple2.f1);
                if (booleanRowTuple2.f0) {
                    return booleanRowTuple2.f1;
                } else {

                }
                return null;
            }
        }).filter(new FilterFunction<Row>() {
            @Override
            public boolean filter(Row row) throws Exception {
                return row != null;
            }
        }).print("triggers ").setParallelism(1);


        Table table_metadata = tEnv.sqlQuery("select schema,souce_properties from triggers_data ");
        //转回DataStream并输出
        DataStream<Tuple2<Boolean, Row>> dataStream_metadata = tEnv.toRetractStream(table_metadata, Row.class);
        dataStream_metadata.map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
            @Override
            public Row map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
                //System.out.println(booleanRowTuple2.f0 + "--" + booleanRowTuple2.f1);

                if (booleanRowTuple2.f0) {
                    Row value = booleanRowTuple2.f1;

                    return booleanRowTuple2.f1;

                } else {

                }
                return null;
            }
        }).filter(new FilterFunction<Row>() {
            @Override
            public boolean filter(Row row) throws Exception {
                return row != null;
            }
        }).setParallelism(1);


        //任务启动,这行必不可少!
        try {
            env.execute("FlinkKafkaDebeziumSource");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}


1.2.5 采坑 


(1)如果保证一致性 配置:
    默认情况下: 保证 exactly-once  如果想要做到消息不重复 at-least-once  
    操作:
        a. 将全局的作业config设置成:table.exec.source.cdc-events-duplicate 设置成 true.
        Configuration configuration = tEnv.getConfig().getConfiguration();
        configuration.setString("table.exec.source.cdc-events-duplicate", "true");
        b.  并在该 source 上定义 PRIMARY KEY.
              eg: PRIMARY KEY (`trigger_id`) NOT ENFORCED  --不检查主键的唯一性
        

(2)强调   'value.debezium-json.schema-include' 
(flink1.12版本) 和之前不太一样包括官网描述也有误差,如果想要准确读取debezium-json 数据,
 之前是 debezium-json.schema-include 如今是 
      'value.debezium-json.schema-include' = 'true' 此选项表明 Debezium JSON 消息是否包含 schema和payload 


 (3)关于debeizum 的 metadata如何获取?

 CREATE TABLE triggers_data ( 
  `schema` STRING NULL  METADATA  FROM 'value.schema' VIRTUAL,
  `ingestion_time` TIMESTAMP(3)   METADATA  FROM 'value.ingestion-timestamp' VIRTUAL,
  `source_time`   TIMESTAMP(3)  METADATA  FROM 'value.source.timestamp' VIRTUAL,
  `source_database` STRING  NULL  METADATA  FROM 'value.source.database'  VIRTUAL,
  `scourc_schema`  STRING NULL   METADATA  FROM 'value.source.schema' VIRTUAL,
  `souce_table` STRING NULL    METADATA  FROM 'value.source.table' VIRTUAL,
  `souce_properties` MAP<STRING, STRING> NULL    METADATA  FROM 'value.source.properties' VIRTUAL,
  `trigger_id` BIGINT,
  `trigger_source` STRING,
  `modify_time` BIGINT ,
  `enc_type` TINYINT ,
  PRIMARY KEY (`trigger_id`) NOT ENFORCED
  ) WITH (
   'connector' = 'kafka',
   'topic' = 'azkaban.DEV.azkaban.triggers_test',
   'properties.bootstrap.servers' = 'localhost:9200',
   'scan.startup.mode' = 'latest-offset',
   'value.format' = 'debezium-json',
  'scan.topic-partition-discovery.interval'='10000',
   'value.debezium-json.ignore-parse-errors'='true',
   'value.debezium-json.timestamp-format.standard'='SQL',
   'value.debezium-json.schema-include'='true'    ) 

 


1.2.6 用debezium-json的 关键 配置


简单抓重点描述 : 

  1.  format
  2.  value.format
  3. value.debezium-json.ignore-parse-errors 忽略解析错误.
  4. value.debezium-json.map-null-key.literal -- 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 
  5. value.debezium-json.map-null-key.mode  --如果出现map数据,key为空的时候,'FAIL': 抛出异常   'DROP':丢弃   'LITERAL':用场量替换
  6. value.debezium-json.schema-include  --默认false,数据中包含schema 和payload等全量,这里需要设置成true 
  7. value.debezium-json.timestamp-format.standard --这个参数感觉还不太好用. SQL '2020-12-30 12:13:14.123'/ 'ISO-8601'  '2020-12-30T12:13:14.123'  

 

1.3 kafka-more-json


1.3.1 flink sql

 CREATE TABLE more_json ( 
   `afterColumns` ROW(`created` STRING,
   `extra` ROW(canGiving BOOLEAN),
      `parameter` ARRAY <INT>),
   `beforeColumns` STRING ,
   `tableVersion` ROW(`binlogFile` STRING,
      `binlogPosition` INT ,
      `version` INT) ,
      `map`     MAP<STRING,BIGINT>,
    `mapinmap`     MAP<STRING,MAP<STRING,INT>>,
   `touchTime` BIGINT
    ) WITH ( 
  'connector' = 'kafka',
  'topic' = 'more_json',
  'properties.group.id' = 'local-sql-test',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'localhost:9200',
  'format' = 'json' ,
  'json.timestamp-format.standard' = 'SQL',
  'scan.topic-partition-discovery.interval' = '10000',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true' 
  ) ;

 

1.3.2 准备数据

 

 (1)create topic

./kafka-topics.sh --create  \

--zookeeper  localhost:2181 \ 

--replication-factor 1 \

--partitions 1  --topic more_json

(2)producer data

 ./kafka-console-producer.sh \

 --broker-list localhost:9200   \

 --topic more_json

(3) data 

json 可以调整,key可以缺少,也可以是int类型)这取决于你后面的datatypes的类型 

{
    "afterColumns":{
        "created":"1589186680",
        "extra":{
            "canGiving":false
        },
        "parameter":[
            1,
            2,
            3,
            4
        ]
    },
    "map":{
        "flink":123
    },
    "mapinmap":{
        "inner_map":{
            "key":234,
            "key2":"12345"
        }
    },
    "beforeColumns":null,
    "tableVersion":{
        "binlogFile":null,
        "binlogPosition":0,
        "version":0
    },
    "touchTime":1589186680591
}

1.3.3 sql-client


此处无需添加新的配置和jar

1.3.4 代码 

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.12.0</version>
</dependency>

/**
 * @Auther: laraj
 * @Date: 2021/1/28
 * @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
 */
public class FlinkKafkaSourceObjectJson implements Serializable {
    public static void main(String[] args) {
/**

 ./kafka-topics.sh --create  \
 --zookeeper  localhost:2181 \
 --replication-factor 1 \
 --partitions 1  --topic more_json

 {"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4]},"map":{"flink":123},"mapinmap":{"inner_map":{"key":234,"key2":"12345"}},"beforeColumns":null,"tableVersion":{"binlogFile":null,"binlogPosition":0,"version":0},"touchTime":1589186680591}

 */
        String moreJsonSql=" CREATE TABLE more_json ( \n" +
                "   `afterColumns` ROW(`created` STRING,\n" +
                "   `extra` ROW(canGiving BOOLEAN),\n" +
                "      `parameter` ARRAY <INT>),\n" +
                "   `beforeColumns` STRING ,\n" +
                "   `tableVersion` ROW(`binlogFile` STRING,\n" +
                "      `binlogPosition` INT ,\n" +
                "      `version` INT) ,\n" +
                "       `map`         MAP<STRING,BIGINT>, " +
                "       `mapinmap`      MAP<STRING,MAP<STRING,INT>>, " +
                "   `touchTime` BIGINT\n" +
                "    ) WITH ( \n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'more_json',\n" +
                "  'properties.group.id' = 'local-sql-test',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9200',\n" +
                "  'format' = 'json' ,\n" +
                "  'json.timestamp-format.standard' = 'SQL',\n" +
                "  'scan.topic-partition-discovery.interval' = '10000',\n" +
                "  'json.fail-on-missing-field' = 'false',\n" +
                "  'json.ignore-parse-errors' = 'true' \n" +
                "  ) ";
        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

        tEnv.executeSql(moreJsonSql);

        //执行查询
     //   Table table = tEnv.sqlQuery("select * from more_json ");
        Table table = tEnv.sqlQuery(" select afterColumns.`parameter`[1] as parameter1, afterColumns.extra['canGiving'] as canGiving ,  `map`['flink'] as flinkv,`mapinmap`['inner_map']['key2'] as mkey2,tableVersion.binlogFile  from more_json  ");

        //转回DataStream并输出
        DataStream<Row> rowDataStream = tEnv.toAppendStream(table, Row.class);
        rowDataStream.map(new MapFunction<Row, Row>() {
            @Override
            public Row map(Row row) throws Exception {
                System.out.println(row.toString());
                return row;
            }
        });
        //.print("sql==").setParallelism(1);
        //任务启动,这行必不可少!
        try {
            env.execute("test");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

 

1.3.5 采坑


(1) 查询sql的时候,记住flink sql中的数组是从1开始的
eg:查询1 .

 select afterColumns.`parameter`[1] as parameter1, afterColumns.extra['canGiving'] as canGiving ,  `map`['flink'] as flinkv,`mapinmap`['inner_map']['key2'] as mkey2,tableVersion.binlogFile  from  


1.3.6  配置


(1) data types

(2) with 配置


 

 

 类似资料: