FLink学习笔记:11-Flink 的Table API的Connector操作

杜浩壤
2023-12-01

创建表环境

//创建tableSetting
val tableEnvironmentSettings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()
  .build()

//创建tableAPI的环境
val tableEnv = StreamTableEnvironment.create(env,tableEnvironmentSettings)

创建表

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以 用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。

视图可以从现有的表中创建,通常是 table API 或者 SQL查询的一个结果。

从DataStream创建表

//从DataStream 创建表
val dataStream = env.readTextFile("D:\\java_workspace\\hadoop\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
  .filter(_.nonEmpty)
  .map(data=>{
    val array = data.split(",")
    Sensor(array(0),array(1).toLong,array(2).toDouble)
  })

使用tableAPI定义表结构

tableEnv.createTable(
  "sensor",
  TableDescriptor
    .forConnector("kafka")
    .schema(
      Schema
        .newBuilder()
        .column("id", DataTypes.STRING())
        .column("timestamp", DataTypes.BIGINT())
        .column("temperature", DataTypes.DOUBLE())
        .build())
    .format("csv")
    .option("properties.bootstrap.servers","k8s-node3:9092")
    .option("properties.group.id","mygroup")
    .option("topic","mytopic2")
    .option("scan.startup.mode","latest-offset")
    .build()
)

使用SQL创建表结构

FLINK-SQL的数据类型

  • CHAR
  • VARCHAR
  • STRING
  • BOOLEAN
  • BYTES
  • DECIMAL
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • FLOAT
  • DOUBLE
  • DATE
  • TIMESTAMP
  • ARRAY
  • MAP
  • ROW

语法:

val creatSql: String =
  """
    |CREATE TABLE sensor(
    |`id` STRING,
    |`timestamp` BIGINT,
    |`temperature` DOUBLE
    |) WITH(
    |'connector' = 'kafka',
    |'format' = 'csv',
    ……
    |)
    |""".stripMargin

tableEnv.executeSql(creatSql)

Formats:

Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。

FormatsSupported Connectors
CSVApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem
JSONApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem, Elasticsearch
Apache AvroApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem
Confluent AvroApache Kafka, Filesystem
Debezium CDCApache Kafka, Filesystem
Canal CDCApache Kafka, Filesystem
Maxwell CDCApache Kafka, Filesystem
OGG CDCApache Kafka, Filesystem
Apache ParquetFilesystem
Apache ORCFilesystem
RawApache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem

CSV格式表

依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-csv</artifactId>
  <version>1.14.4</version>
</dependency>

参数设置:

  • format

    是否必填:是

    默认值:None

    数据类型:String

    说明:声明使用的格式,这里应为’json’。

  • csv.field-delimiter

    是否必填:否

    默认值:“,”

    数据类型:String

    说明:字段分隔符 (默认’,‘),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 ‘\t’ 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 ‘csv.field-delimiter’ = U&’\0001’ 代表 0x01 字符。

  • csv.disable-quote-character

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 ‘csv.quote-character’ 不能设置。

  • csv.quote-character

    是否必填:否

    默认值:"

    数据类型:String

    说明:用于围住字段值的引号字符 (默认").

  • csv.allow-comments

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:是否允许忽略注释行(默认不允许),注释行以 ‘#’ 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。

  • csv.ignore-parse-errors

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

  • csv.array-element-delimiter

    是否必填:否

    默认值:;

    数据类型:String

    说明:分割数组和行元素的字符默认是;

  • csv.escape-character

    是否必填:否

    默认值:None

    数据类型:String

    说明:转义字符,默认关闭

  • csv.null-literal

    是否必填:否

    默认值:None

    数据类型:String

    说明:是否将 “null” 字符串转化为 null 值

JSON格式表

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.14.4</version>
</dependency>

参数设置:

  • format

    是否必填:是

    默认值:None

    数据类型:String

    说明:声明使用的格式,这里应为’json’。

  • json.fail-on-missing-field

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。

  • json.ignore-parse-errors

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

  • json.timestamp-format.standard

    是否必填:否

    默认值:‘SQL’

    数据类型:String

    说明:声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为’SQL’ 以及 ‘ISO-8601’:可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析 TIMESTAMP, 例如 “2020-12-30 12:13:14.123”, 以 “yyyy-MM-dd HH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30 12:13:14.123Z” 且会以相同的格式输出。可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入 TIMESTAMP, 例如 “2020-12-30T12:13:14.123” , 以 “yyyy-MM-ddTHH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30T12:13:14.123Z” 且会以相同的格式输出。

  • json.map-null-key.mode

    是否必填:否

    默认值:‘FAIL’

    数据类型:String

    说明:指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:

    • Option ‘FAIL’ 将抛出异常,如果遇到 Map 中 key 值为空的数据。
    • Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。
    • Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘json.map-null-key.literal’ 定义。
  • json.map-null-key.literal

    是否必填:否

    默认值:'‘null’

    数据类型:String

    说明:当 ‘json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

  • json.encode.decimal-as-plain-number

    是否必填:否

    默认值:false

    数据类型:Boolean

    说明:将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。

Raw 格式表

参数设置:

  • format

    是否必填:是

    默认值:None

    数据类型:String

    说明:声明使用的格式,这里应为’raw’。

  • raw.charset

    是否必填:是

    默认值:UTF-8

    数据类型:String

    说明:指定字符集来编码文本字符串。

  • raw.endianness

    是否必填:是

    默认值:big-endian

    数据类型:String

    说明:指定字节序来编码数字值的字节。有效值为’big-endian’和’little-endian’。

Orc 格式表

依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc</artifactId>
  <version>1.14.4</version>
</dependency>

参数设置:

  • format

    是否必填:是

    默认值:None

    数据类型:String

    说明:声明使用的格式,这里应为’Orc’。

使用SQL在kafka中创建表

依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.14.4</version>
</dependency>

SQL语句

val creatSql: String =
  """
    |CREATE TABLE sensor(
    |`id` STRING,
    |`timestamp` BIGINT,
    |`temperature` DOUBLE
    |) WITH(
    |'connector' = 'kafka',
    |'topic' = 'mytopic2',
    |'format' = 'csv',
    |'csv.ignore-parse-errors' = 'true',
    |'csv.allow-comments' = 'true',
    |'properties.bootstrap.servers' = 'k8s-node3:9092',
    |'properties.group.id' = 'mygroup',
    |'scan.startup.mode' = 'latest-offset'
    |)
    |""".stripMargin
println(creatSql)
tableEnv.executeSql(creatSql)

kafka参数说明

  • connector
    是否必填:是

    默认值

    数据类型:String

    说明:指定使用的连接器,Kafka 连接器使用 ‘kafka’。

  • topic
    是否必填:是

    数据类型:String

    说明:Kafka 记录的 Topic 名。

  • topic-pattern
    是否必填:否

    数据类型:String

    说明:匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。

  • properties.bootstrap.servers
    是否必填:是

    数据类型:String

    说明:逗号分隔的 Kafka broker 列表。如:k8s-node3:9092

  • properties.group.id
    是否必填:对 source 可选,不适用于 sink

    数据类型:String

    说明:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。

  • format
    是否必填:是

    数据类型:String

    说明:用来序列化或反序列化 Kafka 消息的格式。具体取值见 Format说明。

  • key.format
    是否必填:否

    数据类型:String

    说明:用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 ‘key.fields’ 也是必需的。 否则 Kafka 记录将使用空值作为键

  • key.fields
    是否必填:否

    数据类型:List

    说明:表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 ‘field1;field2’。

  • key.fields-prefix
    是否必填:否

    数据类型:String

    说明:为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。

  • value.format
    是否必填:否

    数据类型:String

    说明:序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘format’ 二者必需其一。

  • value.fields-include
    是否必填:否,默认是ALL

    数据类型:枚举类型,可选值:[ALL, EXCEPT_KEY]

    说明:定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

  • scan.startup.mode
    是否必填:否,默认是group-offsets

    数据类型:String

    说明:Kafka consumer 的启动模式。有效值为:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。

  • scan.startup.specific-offsets
    是否必填:否

    数据类型:String

    说明:在使用 ‘specific-offsets’ 启动模式时为每个 partition 指定 offset,例如’partition:0,offset:42;partition:1,offset:300’。

  • scan.startup.timestamp-millis
    是否必填:否

    数据类型:Long

    说明:在使用 ‘timestamp’ 启动模式时指定启动的时间戳(单位毫秒)

  • scan.topic-partition-discovery.interval
    是否必填:否

    数据类型:Duration

    说明:Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。

  • sink.partitioner
    是否必填:否,默认值是default

    数据类型:String

    说明:Flink partition 到 Kafka partition 的分区映射关系,可选值有:

    • default:使用 Kafka 默认的分区器对消息进行分区。
    • fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
    • round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
    • 自定义 FlinkKafkaPartitioner 的子类:例如 ‘org.mycompany.MyPartitioner’
  • sink.semantic
    是否必填:否,默认值是at-least-once

    数据类型:String

    说明:定义 Kafka sink 的语义。有效值为 ‘at-least-once’,‘exactly-once’ 和 ‘none’。

  • sink.semantic
    是否必填:否

    数据类型:Integer

    说明:定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

val creatSql: String =
    """
      |CREATE TABLE sensor(
      |`id` STRING,
      |`timestamp` BIGINT,
      |`temperature` DOUBLE
      |) WITH(
      |'connector' = 'filesystem',
      |'path' = 'file:///path/to/whatever',
      |'format' = 'csv',
      |'csv.allow-comments' = 'true',
      |'csv.ignore-parse-errors' = 'true',
      |'partition.default-name' = 'test-filesystem-table',
      |)
      |""".stripMargin
tableEnv.executeSql(creatSql)

使用SQL在Elasticsearch中创建表

val creatSql: String =
    """
      |CREATE TABLE sensor(
      |`id` STRING,
      |`timestamp` BIGINT,
      |`temperature` DOUBLE
      |) WITH(
      |'connector' = 'elasticsearch-7',
      |'hosts' = 'http://localhost:9200',
      |'index' = 'sensor'
      |)
      |""".stripMargin
tableEnv.executeSql(creatSql)

使用SQL创建JDBC表

依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.14.4</version>
</dependency>

SQL语句:

val creatSql: String =
    """
      |CREATE TABLE sensor(
      |`id` STRING,
      |`timestamp` BIGINT,
      |`temperature` DOUBLE
      |) WITH(
      |'connector' = 'jdbc',
      |'url' = 'jdbc:mysql://localhost:3306/mydatabase',
      |'table-name' = 'sensor'
      |)
      |""".stripMargin
tableEnv.executeSql(creatSql)

示例:从文件格式表中读取数据,做统计后写入到mysql中

package com.hjt.yxh.hw.tableapitest
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, TableDescriptor}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object FileInput {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tableEnv = StreamTableEnvironment.create(env)

    //從文件中創建一張表
    tableEnv.createTemporaryTable("sensor",
      TableDescriptor
        .forConnector("filesystem")
        .schema(
          Schema
            .newBuilder()
            .column("id", DataTypes.STRING())
            .column("timestamp", DataTypes.BIGINT())
            .column("temper",DataTypes.DOUBLE())
            .build()
        )
        .format("csv")
        .option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
        .option("csv.ignore-parse-errors","true")
        .build()
    )

    //從文件中創建一張表
    tableEnv.createTemporaryTable("sensor_result",
      TableDescriptor
        .forConnector("jdbc")
        .schema(
          Schema
            .newBuilder()
            .column("id",DataTypes.STRING().notNull())
            .column("cnt", DataTypes.BIGINT())
            .primaryKey("id")
            .build()
        )
        .option("url","jdbc:mysql://127.0.0.1:3306/news?useSSL=false ")
        .option("username","root")
        .option("password","mysqlroot")
        .option("table-name","sensor_result")
        .build()
    )
//使用SQL方式
//    val jdbcSql =
//      """
//        |CREATE TABLE sensor_result2 (
//        |  id STRING NOT NULL,
//        |  cnt BIGINT NOT NULL,
//        |  PRIMARY KEY (id) NOT ENFORCED
//        |) WITH (
//        |   'connector' = 'jdbc',
//        |   'url' = 'jdbc:mysql://localhost:3306/news?useSSL=false',
//        |   'table-name' = 'sensor_result',
//        |   'username' = 'root',
//        |   'password' = 'mysqlroot'
//        |)
//      """.stripMargin
//    tableEnv.executeSql(jdbcSql)

    val sensorTable = tableEnv.sqlQuery("select * from sensor")

    val aggTable = sensorTable
        .groupBy($"id")
        .select($"id",$"id".count() as "count")
        .executeInsert("sensor_result")

    tableEnv.toDataStream(sensorTable).print()
//    env.execute()
  }
}

完整示例从Kafka读取数据,做转换后,将结果插入到kafka

maven依賴:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.14.4</version>
    <!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.14.4</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.14.4</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.14.4</version>
</dependency>

代码:

package com.hjt.yxh.hw.tableapitest

import com.hjt.yxh.hw.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableResult}

object HelloWorld {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setMaxParallelism(1)

    val tableSetting = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .withBuiltInCatalogName("zealink")
      .build()

    //创建table Env
    val tableEnv = StreamTableEnvironment.create(env, tableSetting)

    //创建表
    tableEnv.createTable(
      "sensor",
      TableDescriptor
        .forConnector("kafka")
        .schema(
          Schema
            .newBuilder()
            .column("id", DataTypes.STRING())
            .column("timestamp", DataTypes.BIGINT())
            .column("temperature", DataTypes.DOUBLE())
            .build())
        .format("csv")
        .option("properties.bootstrap.servers","k8s-node3:9092")
        .option("properties.group.id","mygroup")
        .option("topic","mytopic2")
        .option("scan.startup.mode","latest-offset")
        .build()
    )

    val dataStream = env
      .readTextFile(
        "D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
      .map(data => {
        val array = data.split(",")
        SensorReading(array(0), array(1).toLong, array(2).toDouble)
      })

    val sourceTable = tableEnv.from("sensor")
    val result = sourceTable
      .select($"id",  $"temperature")
      .filter($"id" === "sensor_1")


    //创建目标表(写入到文件)
    val targetTable = tableEnv.createTable("result",
      TableDescriptor
        .forConnector("filesystem")
        .schema(Schema.newBuilder()
          .column("id",DataTypes.STRING())
          .column("temperature",DataTypes.DOUBLE())
          .build())
        .format("csv")
        .option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\out")
        .build()
    )

    val kafkaTargetTable = tableEnv.createTable("result_kafka",
      TableDescriptor.forConnector("kafka")
        .schema(Schema.newBuilder()
          .column("id",DataTypes.STRING())
          .column("temperature",DataTypes.DOUBLE())
          .build())
        .format("json")
        .option("properties.bootstrap.servers","k8s-node3:9092,k8s-node5:9092,k8s-node8:9092")
        .option("topic","result_topic")
        .build()
    )

    //执行插入操作
    result.executeInsert("result")
    result.executeInsert("result_kafka")
    
    val resultStream = tableEnv.toDataStream(result)
    resultStream.print()
    env.execute()
  }
}

 类似资料: