FlinkSql读取Pulsar中的Json结构数据并保存

上官兴昌
2023-12-01


前言

Flink和Pulsar是当前大数据常用的组件,他们的优势和特点在此不在赘述。可参考Flink官网Pulsar官网
我使用的Flink版本为1.12,Pulsar版本为2.9.1。
此文章的背景为Canal采集MySQL中的binlog数据,写入Pulsar,由Flink解析Pulsar中的Json数据,写入到存储中。


一、确定写入pulsar中的数据结构

在pulsar的目录下执行以下命令:
查看最早的一条数据:

bin/pulsar-client consume --subscription-position Earliest persistent://public/default/yourtopicname -s "first-subscription"

查看最新的一条数据:

bin/pulsar-client consume persistent://public/default/yourtopicname -s "first-subscription"

二、分析Pulsar中的数据结构

将一步骤中数据的内容进行解析

可在https://www.json.cn/中进行结构化输出
输出结果:
data中为时间的数据,mysqlType为数据的类型。

{
    "data":[
        {
            "wp_web_page_sk":"60",
            "wp_web_page_id":"AAAAAAAAKDAAAAAA",
            "wp_rec_start_date":"2001-09-03",
            "wp_rec_end_date":"0000-00-00",
            "wp_creation_date_sk":"2450813",
            "wp_access_date_sk":"2452566",
            "wp_autogen_flag":"Y",
            "wp_customer_sk":"80555",
            "wp_url":"http://www.foo.com",
            "wp_type":"welcome",
            "wp_char_count":"6577",
            "wp_link_count":"24",
            "wp_image_count":"2",
            "wp_max_ad_count":"3"
        }
    ],
    "database":"tpcds_01",
    "es":1656468147000,
    "id":152155,
    "isDdl":false,
    "mysqlType":{
        "wp_web_page_sk":"int(11)",
        "wp_web_page_id":"char(16)",
        "wp_rec_start_date":"date",
        "wp_rec_end_date":"date",
        "wp_creation_date_sk":"int(11)",
        "wp_access_date_sk":"int(11)",
        "wp_autogen_flag":"char(1)",
        "wp_customer_sk":"int(11)",
        "wp_url":"varchar(100)",
        "wp_type":"char(50)",
        "wp_char_count":"int(11)",
        "wp_link_count":"int(11)",
        "wp_image_count":"int(11)",
        "wp_max_ad_count":"int(11)"
    },
    "old":null,
    "pkNames":[
        "wp_web_page_sk"
    ],
    "sql":"",
    "sqlType":{
        "wp_web_page_sk":4,
        "wp_web_page_id":1,
        "wp_rec_start_date":91,
        "wp_rec_end_date":91,
        "wp_creation_date_sk":4,
        "wp_access_date_sk":4,
        "wp_autogen_flag":1,
        "wp_customer_sk":4,
        "wp_url":12,
        "wp_type":1,
        "wp_char_count":4,
        "wp_link_count":4,
        "wp_image_count":4,
        "wp_max_ad_count":4
    },
    "table":"web_page",
    "ts":1656468638479,
    "type":"INSERT"
}

三、写FlinkSql解析Json数据

1、创建source端

要注意'data'的格式和写法,此过程的本质为行转列的过程。
scan.startup.sub-start-offset消费节点配置可参考Git上的源码,因为这块内容目前在Flink官网上还无法找到

CREATE TABLE ods_tpcds_01_web_page_rt_source
(
    `data` ARRAY < ROW < wp_web_page_sk BIGINT,
	wp_web_page_id STRING,
	wp_rec_start_date STRING,
	wp_rec_end_date STRING,
	wp_creation_date_sk BIGINT,
	wp_access_date_sk BIGINT,
	wp_autogen_flag STRING,
	wp_customer_sk BIGINT,
	wp_url STRING,
	wp_type STRING,
	wp_char_count BIGINT,
	wp_link_count BIGINT,
	wp_image_count BIGINT,
	wp_max_ad_count BIGINT >>,
	`database` STRING,
	`isDdl` STRING,
	`table` STRING,
	`type` STRING,
	`es` BIGINT,
	`ts` BIGINT
) WITH (
      'connector' = 'pulsar',
      'topic' = 'persistent://public/default/tpcds_01_web_page',
      'service-url' = 'pulsar://xx.xx.xx.xx:6650',
      'admin-url' = 'http://xx.xx.xx.xx:8080',
      'scan.startup.mode' = 'external-subscription',
      'scan.startup.sub-name' = 'ods_tpcds_01_web_page_rt_v1',
      'scan.startup.sub-start-offset' = 'earliest',
      'format' = 'json'
      );

2、创建sink端

sink.rolling-policy.rollover-interval、sink.rolling-policy.file-size等参数可参考官网解释,自行修改
官网地址

CREATE TABLE ods_tpcds_01_web_page_rt_sink
(
    wp_web_page_sk BIGINT,
	wp_web_page_id STRING,
	wp_rec_start_date STRING,
	wp_rec_end_date STRING,
	wp_creation_date_sk BIGINT,
	wp_access_date_sk BIGINT,
	wp_autogen_flag STRING,
	wp_customer_sk BIGINT,
	wp_url STRING,
	wp_type STRING,
	wp_char_count BIGINT,
	wp_link_count BIGINT,
	wp_image_count BIGINT,
	wp_max_ad_count BIGINT,
	`database` STRING,
	`isDdl` STRING,
	`type` STRING,
	`table` STRING,
	`es` BIGINT,
	`ts` BIGINT,
	`pt` STRING
) PARTITIONED BY (pt) WITH (
      'connector' = 'filesystem',
      'path' = '可以写入hdfs或者minion等存储系统中,例如:s3a://bucket1/test/ods_poc_tpcds_01_web_page_rt_v1或者hdfs://test/ods_poc_tpcds_01_web_page_rt_v1',
      'sink.rolling-policy.rollover-interval' = '1min',
      'sink.rolling-policy.file-size' = '128M',
      'format' = 'parquet'
);

3、写入数据

INSERT INTO ods_tpcds_01_web_page_rt_sink
SELECT 
	wp_web_page_sk,
	wp_web_page_id,
	wp_rec_start_date,
	wp_rec_end_date,
	wp_creation_date_sk,
	wp_access_date_sk,
	wp_autogen_flag,
	wp_customer_sk,
	wp_url,
	wp_type,
	wp_char_count,
	wp_link_count,
	wp_image_count,
	wp_max_ad_count,
	`database`,
	`isDdl`,
	`type`,
	`table`,
	`es`,
	`ts`,
     FROM_UNIXTIME((`ts` / 1000) + 60 * 60 * 8, 'yyyy-MM-dd') as pt
FROM ods_tpcds_01_web_page_rt_source
         CROSS JOIN UNNEST(`data`) AS t(
		wp_web_page_sk,
		wp_web_page_id,
		wp_rec_start_date,
		wp_rec_end_date,
		wp_creation_date_sk,
		wp_access_date_sk,
		wp_autogen_flag,
		wp_customer_sk,
		wp_url,
		wp_type,
		wp_char_count,
		wp_link_count,
		wp_image_count,
	wp_max_ad_count 
);

总结

FlinkSql解析Json的步骤基本相似,主要是对Pulsar中Json结构的分析和创建对应的Flink表。如有描述不当,烦请指正。

 类似资料: