tEnv.createTemporaryView("new_table", datastream[, schema])
等价于tEnv.createTemporaryView("new_table", tEnv.fromDataStream(datastream[, schema]))
隐士转换使用如下:
import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toTable(tEnv[, Schema/Expression*($("col1"), $("col2"))])
示例程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
case class UserClick(name: String, url: String, cTime: Long)
class RecordTimestampAssigner extends TimestampAssigner[UserClick] {
override def extractTimestamp(element: UserClick, recordTimestamp: Long): Long = {
element.cTime
}
}
class PeriodWatermarkGenerator extends WatermarkGenerator[UserClick] {
var maxTimestamp: Long = _
val maxOutofOrderness = 5
// 1. 根据某个特殊的event emit watermark
override def onEvent(event: UserClick, eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = math.max(event.cTime, maxTimestamp)
}
// 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
}
}
class MyWatermarkStrategy extends WatermarkStrategy[UserClick] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[UserClick] = {
new RecordTimestampAssigner()
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserClick] = {
new PeriodWatermarkGenerator()
}
}
object flink_test {
def main(args: Array[String]): Unit = {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
val datastream1 = senv.fromElements(
UserClick("zhang_san", "./home", fdf.parse("2022-01-27 07:58:18").getTime),
UserClick("li_si", "./cart", fdf.parse("2022-01-27 07:59:36").getTime)
).assignTimestampsAndWatermarks(new MyWatermarkStrategy())
println(datastream1.dataType)
datastream1.print()
val table1 = tEnv.fromDataStream(
datastream1,
Schema.newBuilder()
// datastream的列要么不在schema中定义, 要么就全部定义, 定义时可以改变列的类型
.column("name", DataTypes.STRING())
.column("url", DataTypes.STRING())
.column("cTime", DataTypes.BIGINT())
// 新增列
.columnByExpression("new_cTime", "to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss'))")
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", "SOURCE_WATERMARK()")
.build()
)
table1.printSchema()
table1.execute().print()
senv.execute()
}
}
执行结果如下:
UserClick(name: String, url: String, cTime: Long)
(
`name` STRING,
`url` STRING,
`cTime` BIGINT,
`new_cTime` TIMESTAMP(3) AS to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss')),
`rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op | name | url | cTime | new_cTime | rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I | zhang_san | ./home | 1643241498000 | 2022-01-27 07:58:18.000 | 2022-01-27 07:58:18.000 |
| +I | li_si | ./cart | 1643241576000 | 2022-01-27 07:59:36.000 | 2022-01-27 07:59:36.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
2 rows in set
1> UserClick(zhang_san,./home,1643241498000)
2> UserClick(li_si,./cart,1643241576000)
说明如下:
隐士转换使用如下:
import org.apache.flink.table.api.bridge.scala.tableConversions
table.toDataStream([DataTypes/class])
示例程序:
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, TableDescriptor}
/*
val datetime1:java.time.Instant = java.time.Instant.ofEpochMilli(3000L)
println(datetime1) // 1970-01-01T00:00:03Z
val timestamp1:Long = datetime1.toEpochMilli
println(timestamp1) // 3000
*/
case class User(name: String, score: Double, dataTime: java.time.Instant)
object flink_test {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
tEnv.createTemporaryTable("default_catalog.default_database.dataSource",
TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("score", DataTypes.DOUBLE())
.column("dataTime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("dataTime", "dataTime - interval '10' second")
.build())
.option("rows-per-second", "1") // 每个slot每秒产生的数据量
.build()
)
// tEnv.from(TableDescriptor)
val dataSourceTable = tEnv.from("dataSource")
dataSourceTable.printSchema()
// tEnv.executeSql("select * from dataSource").print()
// tEnv.toDataStream(dataSourceTable, classOf[User])
// tEnv.toDataStream(dataSourceTable, DataTypes.of(classOf[User]))
val dataSourceDatastream: DataStream[User] = tEnv.toDataStream(dataSourceTable,
// 默认是Row<String, Double, Timestamp_LTZ>类型。使用此方式可以改变列的类型
DataTypes.STRUCTURED(
classOf[User],
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE()),
DataTypes.FIELD("dataTime", DataTypes.TIMESTAMP_LTZ(3))
)
)
print(dataSourceDatastream.dataType)
dataSourceDatastream.executeAndCollect().foreach(println)
}
}
输出结果如下:
(
`name` STRING,
`score` DOUBLE,
`dataTime` TIMESTAMP_LTZ(3) *ROWTIME*,
WATERMARK FOR `dataTime`: TIMESTAMP_LTZ(3) AS dataTime - interval '10' second
)
*User<`name` STRING, `score` DOUBLE, `dataTime` TIMESTAMP_LTZ(3)>*(User, org.apache.flink.table.runtime.typeutils.ExternalSerializer)User(d089222b0e15ce426897416fa0b23b2eb6a7d35dda0d8e23f61c673608b24faac09d3e2bc12b3d21aa6c018b0cf51ef0f980,1.6976293338728425E308,2022-01-27T04:30:47.009Z)
User(379a52cd0330f35856622d68b9c86be979de32a20ab3401bc377389a59494b325b50c9cb0186e7ac27170fa7c86c9f3ed0d1,5.483394273040764E307,2022-01-27T04:30:47.009Z)
User(c38f9276eec2c9f7d8bb8cba26665ff6bde5af68474e8ea65c6944f2c5fb457932fb0ab6f50172e2826880748b384c103241,4.605199108304596E307,2022-01-27T04:30:47.009Z)
......省略部分......
说明如下:
隐士转换使用如下:
import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toChangelogTable(tEnv[, Schema[, ChangelogMode]])
示例程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}
class RecordTimestampAssigner extends TimestampAssigner[Row] {
override def extractTimestamp(element: Row, recordTimestamp: Long): Long = {
element.getFieldAs[Long]("f2")
}
}
class PeriodWatermarkGenerator extends WatermarkGenerator[Row] {
var maxTimestamp: Long = _
val maxOutofOrderness = 5
// 1. 根据某个特殊的event emit watermark
override def onEvent(event: Row, eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = math.max(event.getFieldAs[Long]("f2"), maxTimestamp)
}
// 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
}
}
class MyWatermarkStrategy extends WatermarkStrategy[Row] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Row] = {
new RecordTimestampAssigner()
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Row] = {
new PeriodWatermarkGenerator()
}
}
object flink_test {
def main(args: Array[String]): Unit = {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
// 有RowKind.UPDATE_BEFORE和RowKind.UPDATE_AFTER, 则为retract形式的changelog stream
// 有RowKind.UPDATE_AFTER, 则为upsert形式的changelog stream
val datastream1 = senv.fromElements(
Row.ofKind(RowKind.INSERT, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 07:58:18").getTime)),
Row.ofKind(RowKind.INSERT, "li_si", "./cart", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),
Row.ofKind(RowKind.UPDATE_AFTER, "li_si", "./cart2", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),
Row.ofKind(RowKind.DELETE, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 12:59:36").getTime))
)(Types.ROW(Types.STRING, Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(new MyWatermarkStrategy())
println(datastream1.dataType)
datastream1.print()
val table1 = tEnv.fromChangelogStream(datastream1,
Schema.newBuilder()
.primaryKey("f0", "f1")
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.build(),
ChangelogMode.upsert()
)
table1.printSchema()
table1.execute().print()
senv.execute()
}
}
结果如下:
Row(f0: String, f1: String, f2: Long)
(
`f0` STRING NOT NULL,
`f1` STRING NOT NULL,
`f2` BIGINT,
`rowtime` TIMESTAMP_LTZ(3) METADATA,
CONSTRAINT `PK_f0_f1` PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| op | f0 | f1 | f2 | rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| +I | li_si | ./cart | 1643241576000 | 2022-01-27 07:59:36.000 |
| +I | li_si | ./cart2 | 1643241576000 | 2022-01-27 07:59:36.000 |
| +I | zhang_san | ./home | 1643241498000 | 2022-01-27 07:58:18.000 |
| -D | zhang_san | ./home | 1643241498000 | 2022-01-27 07:58:18.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
4 rows in set
1> -D[zhang_san, ./home, 1643259576000]
6> +I[zhang_san, ./home, 1643241498000]
7> +I[li_si, ./cart, 1643241576000]
8> +U[li_si, ./cart2, 1643241576000]
说明如下:
隐士转换使用如下:
import org.apache.flink.table.api.bridge.scala.tableConversions
table.toChangelogStream([Schema[, ChangelogMode]])
import org.apache.flink.table.api.bridge.scala.tableToChangelogDataStream
val datastream:DataStream[Row] = table
示例程序
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, long2Literal, row, string2Literal}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.Row
import scala.collection.JavaConversions.asScalaIterator
object flink_test {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
val table1 = tEnv.fromValues(
row("zhang_san", 10, "2022-01-27 15:00:06"),
row("li_si", 100, "2022-01-27 16:00:06"),
row("zhang_san", 20, "2022-01-27 15:00:16"),
row("zhang_san", 30, "2022-01-27 15:00:36"),
row("li_si", 200, "2022-01-27 16:00:56")
).as("name", "amount", "dataTime")
table1.printSchema()
table1.execute().collect().foreach(println)
val resultTable = table1
.groupBy($("name"))
.select($("name"), $("amount").sum().as("amount"))
resultTable.printSchema()
resultTable.execute().print()
val resultDatastream: DataStream[Row] = tEnv.toChangelogStream(
resultTable,
Schema.newBuilder()
.column("name", DataTypes.STRING().bridgedTo(classOf[String]))
.column("amount", DataTypes.BIGINT().bridgedTo(classOf[java.lang.Long]))
.build(),
ChangelogMode.all()
)
print(resultDatastream.dataType)
resultDatastream.print()
resultDatastream.executeAndCollect()
.foreach(
row => {
println(row.getKind)
println(row.getFieldNames(true))
}
)
}
}
结果如下:
(
`name` VARCHAR(9) NOT NULL,
`amount` BIGINT NOT NULL,
`dataTime` CHAR(19) NOT NULL
)
+I[zhang_san, 10, 2022-01-27 15:00:06]
+I[li_si, 100, 2022-01-27 16:00:06]
+I[zhang_san, 20, 2022-01-27 15:00:16]
+I[zhang_san, 30, 2022-01-27 15:00:36]
+I[li_si, 200, 2022-01-27 16:00:56]
(
`name` VARCHAR(9) NOT NULL,
`amount` BIGINT NOT NULL
)
+----+--------------------------------+----------------------+
| op | name | amount |
+----+--------------------------------+----------------------+
| +I | zhang_san | 10 |
| -U | zhang_san | 10 |
| +U | zhang_san | 30 |
| -U | zhang_san | 30 |
| +U | zhang_san | 60 |
| +I | li_si | 100 |
| -U | li_si | 100 |
| +U | li_si | 300 |
+----+--------------------------------+----------------------+
8 rows in set
ROW<`name` STRING, `amount` BIGINT> NOT NULL(org.apache.flink.types.Row, org.apache.flink.table.runtime.typeutils.ExternalSerializer)2> +I[zhang_san, 10]
4> +I[li_si, 100]
4> -U[li_si, 100]
2> -U[zhang_san, 10]
4> +U[li_si, 300]
2> +U[zhang_san, 30]
2> -U[zhang_san, 30]
2> +U[zhang_san, 60]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
说明:
将Table和SQL的多个insert操作,转换成DataStream,同时清除statementSet,最后通过senv.execute()进行触发
val statementSet = tEnv.createStatementSet()
statementSet.addInsert(......)
statementSet .addInsertSql(......)
statementSet.attachAsDataStream()
senv.execute()