//创建tableSetting
val tableEnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
//创建tableAPI的环境
val tableEnv = StreamTableEnvironment.create(env,tableEnvironmentSettings)
TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。
表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。
表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以 用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。
视图可以从现有的表中创建,通常是 table API 或者 SQL查询的一个结果。
//从DataStream 创建表
val dataStream = env.readTextFile("D:\\java_workspace\\hadoop\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
.filter(_.nonEmpty)
.map(data=>{
val array = data.split(",")
Sensor(array(0),array(1).toLong,array(2).toDouble)
})
tableEnv.createTable(
"sensor",
TableDescriptor
.forConnector("kafka")
.schema(
Schema
.newBuilder()
.column("id", DataTypes.STRING())
.column("timestamp", DataTypes.BIGINT())
.column("temperature", DataTypes.DOUBLE())
.build())
.format("csv")
.option("properties.bootstrap.servers","k8s-node3:9092")
.option("properties.group.id","mygroup")
.option("topic","mytopic2")
.option("scan.startup.mode","latest-offset")
.build()
)
val creatSql: String =
"""
|CREATE TABLE sensor(
|`id` STRING,
|`timestamp` BIGINT,
|`temperature` DOUBLE
|) WITH(
|'connector' = 'kafka',
|'format' = 'csv',
……
|)
|""".stripMargin
tableEnv.executeSql(creatSql)
Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。
Formats | Supported Connectors |
---|---|
CSV | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
JSON | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
Confluent Avro | Apache Kafka, Filesystem |
Debezium CDC | Apache Kafka, Filesystem |
Canal CDC | Apache Kafka, Filesystem |
Maxwell CDC | Apache Kafka, Filesystem |
OGG CDC | Apache Kafka, Filesystem |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
Raw | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.4</version>
</dependency>
参数设置:
format
是否必填:是
默认值:None
数据类型:String
说明:声明使用的格式,这里应为’json’。
csv.field-delimiter
是否必填:否
默认值:“,”
数据类型:String
说明:字段分隔符 (默认’,‘),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 ‘\t’ 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 ‘csv.field-delimiter’ = U&’\0001’ 代表 0x01 字符。
csv.disable-quote-character
是否必填:否
默认值:false
数据类型:Boolean
说明:是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 ‘csv.quote-character’ 不能设置。
csv.quote-character
是否必填:否
默认值:"
数据类型:String
说明:用于围住字段值的引号字符 (默认").
csv.allow-comments
是否必填:否
默认值:false
数据类型:Boolean
说明:是否允许忽略注释行(默认不允许),注释行以 ‘#’ 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
csv.ignore-parse-errors
是否必填:否
默认值:false
数据类型:Boolean
说明:当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
csv.array-element-delimiter
是否必填:否
默认值:;
数据类型:String
说明:分割数组和行元素的字符默认是;
csv.escape-character
是否必填:否
默认值:None
数据类型:String
说明:转义字符,默认关闭
csv.null-literal
是否必填:否
默认值:None
数据类型:String
说明:是否将 “null” 字符串转化为 null 值
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.4</version>
</dependency>
参数设置:
format
是否必填:是
默认值:None
数据类型:String
说明:声明使用的格式,这里应为’json’。
json.fail-on-missing-field
是否必填:否
默认值:false
数据类型:Boolean
说明:当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。
json.ignore-parse-errors
是否必填:否
默认值:false
数据类型:Boolean
说明:当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
json.timestamp-format.standard
是否必填:否
默认值:‘SQL’
数据类型:String
说明:声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为’SQL’ 以及 ‘ISO-8601’:可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析 TIMESTAMP, 例如 “2020-12-30 12:13:14.123”, 以 “yyyy-MM-dd HH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30 12:13:14.123Z” 且会以相同的格式输出。可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入 TIMESTAMP, 例如 “2020-12-30T12:13:14.123” , 以 “yyyy-MM-ddTHH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30T12:13:14.123Z” 且会以相同的格式输出。
json.map-null-key.mode
是否必填:否
默认值:‘FAIL’
数据类型:String
说明:指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:
json.map-null-key.literal
是否必填:否
默认值:'‘null’
数据类型:String
说明:当 ‘json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
json.encode.decimal-as-plain-number
是否必填:否
默认值:false
数据类型:Boolean
说明:将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。
参数设置:
format
是否必填:是
默认值:None
数据类型:String
说明:声明使用的格式,这里应为’raw’。
raw.charset
是否必填:是
默认值:UTF-8
数据类型:String
说明:指定字符集来编码文本字符串。
raw.endianness
是否必填:是
默认值:big-endian
数据类型:String
说明:指定字节序来编码数字值的字节。有效值为’big-endian’和’little-endian’。
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>1.14.4</version>
</dependency>
参数设置:
format
是否必填:是
默认值:None
数据类型:String
说明:声明使用的格式,这里应为’Orc’。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.14.4</version>
</dependency>
val creatSql: String =
"""
|CREATE TABLE sensor(
|`id` STRING,
|`timestamp` BIGINT,
|`temperature` DOUBLE
|) WITH(
|'connector' = 'kafka',
|'topic' = 'mytopic2',
|'format' = 'csv',
|'csv.ignore-parse-errors' = 'true',
|'csv.allow-comments' = 'true',
|'properties.bootstrap.servers' = 'k8s-node3:9092',
|'properties.group.id' = 'mygroup',
|'scan.startup.mode' = 'latest-offset'
|)
|""".stripMargin
println(creatSql)
tableEnv.executeSql(creatSql)
connector
是否必填:是
默认值:
数据类型:String
说明:指定使用的连接器,Kafka 连接器使用 ‘kafka’。
topic
是否必填:是
数据类型:String
说明:Kafka 记录的 Topic 名。
topic-pattern
是否必填:否
数据类型:String
说明:匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。
properties.bootstrap.servers
是否必填:是
数据类型:String
说明:逗号分隔的 Kafka broker 列表。如:k8s-node3:9092
properties.group.id
是否必填:对 source 可选,不适用于 sink
数据类型:String
说明:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。
format
是否必填:是
数据类型:String
说明:用来序列化或反序列化 Kafka 消息的格式。具体取值见 Format说明。
key.format
是否必填:否
数据类型:String
说明:用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 ‘key.fields’ 也是必需的。 否则 Kafka 记录将使用空值作为键
key.fields
是否必填:否
数据类型:List
说明:表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 ‘field1;field2’。
key.fields-prefix
是否必填:否
数据类型:String
说明:为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
value.format
是否必填:否
数据类型:String
说明:序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘format’ 二者必需其一。
value.fields-include
是否必填:否,默认是ALL
数据类型:枚举类型,可选值:[ALL, EXCEPT_KEY]
说明:定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
scan.startup.mode
是否必填:否,默认是group-offsets
数据类型:String
说明:Kafka consumer 的启动模式。有效值为:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。
scan.startup.specific-offsets
是否必填:否
数据类型:String
说明:在使用 ‘specific-offsets’ 启动模式时为每个 partition 指定 offset,例如’partition:0,offset:42;partition:1,offset:300’。
scan.startup.timestamp-millis
是否必填:否
数据类型:Long
说明:在使用 ‘timestamp’ 启动模式时指定启动的时间戳(单位毫秒)
scan.topic-partition-discovery.interval
是否必填:否
数据类型:Duration
说明:Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。
sink.partitioner
是否必填:否,默认值是default
数据类型:String
说明:Flink partition 到 Kafka partition 的分区映射关系,可选值有:
sink.semantic
是否必填:否,默认值是at-least-once
数据类型:String
说明:定义 Kafka sink 的语义。有效值为 ‘at-least-once’,‘exactly-once’ 和 ‘none’。
sink.semantic
是否必填:否
数据类型:Integer
说明:定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。
val creatSql: String =
"""
|CREATE TABLE sensor(
|`id` STRING,
|`timestamp` BIGINT,
|`temperature` DOUBLE
|) WITH(
|'connector' = 'filesystem',
|'path' = 'file:///path/to/whatever',
|'format' = 'csv',
|'csv.allow-comments' = 'true',
|'csv.ignore-parse-errors' = 'true',
|'partition.default-name' = 'test-filesystem-table',
|)
|""".stripMargin
tableEnv.executeSql(creatSql)
val creatSql: String =
"""
|CREATE TABLE sensor(
|`id` STRING,
|`timestamp` BIGINT,
|`temperature` DOUBLE
|) WITH(
|'connector' = 'elasticsearch-7',
|'hosts' = 'http://localhost:9200',
|'index' = 'sensor'
|)
|""".stripMargin
tableEnv.executeSql(creatSql)
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.4</version>
</dependency>
SQL语句:
val creatSql: String =
"""
|CREATE TABLE sensor(
|`id` STRING,
|`timestamp` BIGINT,
|`temperature` DOUBLE
|) WITH(
|'connector' = 'jdbc',
|'url' = 'jdbc:mysql://localhost:3306/mydatabase',
|'table-name' = 'sensor'
|)
|""".stripMargin
tableEnv.executeSql(creatSql)
示例:从文件格式表中读取数据,做统计后写入到mysql中
package com.hjt.yxh.hw.tableapitest
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, TableDescriptor}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FileInput {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
//從文件中創建一張表
tableEnv.createTemporaryTable("sensor",
TableDescriptor
.forConnector("filesystem")
.schema(
Schema
.newBuilder()
.column("id", DataTypes.STRING())
.column("timestamp", DataTypes.BIGINT())
.column("temper",DataTypes.DOUBLE())
.build()
)
.format("csv")
.option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
.option("csv.ignore-parse-errors","true")
.build()
)
//從文件中創建一張表
tableEnv.createTemporaryTable("sensor_result",
TableDescriptor
.forConnector("jdbc")
.schema(
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("cnt", DataTypes.BIGINT())
.primaryKey("id")
.build()
)
.option("url","jdbc:mysql://127.0.0.1:3306/news?useSSL=false ")
.option("username","root")
.option("password","mysqlroot")
.option("table-name","sensor_result")
.build()
)
//使用SQL方式
// val jdbcSql =
// """
// |CREATE TABLE sensor_result2 (
// | id STRING NOT NULL,
// | cnt BIGINT NOT NULL,
// | PRIMARY KEY (id) NOT ENFORCED
// |) WITH (
// | 'connector' = 'jdbc',
// | 'url' = 'jdbc:mysql://localhost:3306/news?useSSL=false',
// | 'table-name' = 'sensor_result',
// | 'username' = 'root',
// | 'password' = 'mysqlroot'
// |)
// """.stripMargin
// tableEnv.executeSql(jdbcSql)
val sensorTable = tableEnv.sqlQuery("select * from sensor")
val aggTable = sensorTable
.groupBy($"id")
.select($"id",$"id".count() as "count")
.executeInsert("sensor_result")
tableEnv.toDataStream(sensorTable).print()
// env.execute()
}
}
maven依賴:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.14.4</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.4</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.4</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.4</version>
</dependency>
代码:
package com.hjt.yxh.hw.tableapitest
import com.hjt.yxh.hw.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableResult}
object HelloWorld {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setMaxParallelism(1)
val tableSetting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.withBuiltInCatalogName("zealink")
.build()
//创建table Env
val tableEnv = StreamTableEnvironment.create(env, tableSetting)
//创建表
tableEnv.createTable(
"sensor",
TableDescriptor
.forConnector("kafka")
.schema(
Schema
.newBuilder()
.column("id", DataTypes.STRING())
.column("timestamp", DataTypes.BIGINT())
.column("temperature", DataTypes.DOUBLE())
.build())
.format("csv")
.option("properties.bootstrap.servers","k8s-node3:9092")
.option("properties.group.id","mygroup")
.option("topic","mytopic2")
.option("scan.startup.mode","latest-offset")
.build()
)
val dataStream = env
.readTextFile(
"D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt")
.map(data => {
val array = data.split(",")
SensorReading(array(0), array(1).toLong, array(2).toDouble)
})
val sourceTable = tableEnv.from("sensor")
val result = sourceTable
.select($"id", $"temperature")
.filter($"id" === "sensor_1")
//创建目标表(写入到文件)
val targetTable = tableEnv.createTable("result",
TableDescriptor
.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id",DataTypes.STRING())
.column("temperature",DataTypes.DOUBLE())
.build())
.format("csv")
.option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\out")
.build()
)
val kafkaTargetTable = tableEnv.createTable("result_kafka",
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id",DataTypes.STRING())
.column("temperature",DataTypes.DOUBLE())
.build())
.format("json")
.option("properties.bootstrap.servers","k8s-node3:9092,k8s-node5:9092,k8s-node8:9092")
.option("topic","result_topic")
.build()
)
//执行插入操作
result.executeInsert("result")
result.executeInsert("result_kafka")
val resultStream = tableEnv.toDataStream(result)
resultStream.print()
env.execute()
}
}