当前位置: 首页 > 工具软件 > sqlstream > 使用案例 >

Flink Table和SQL中Table和DataStream的相互转换(fromDataStream、toChangelogStream、attachAsDataStream)

汪成仁
2023-12-01

1. tEnv.fromDataStream(datastream[, schema])

tEnv.createTemporaryView("new_table", datastream[, schema])等价于tEnv.createTemporaryView("new_table", tEnv.fromDataStream(datastream[, schema]))

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toTable(tEnv[, Schema/Expression*($("col1"), $("col2"))])

示例程序

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}

case class UserClick(name: String, url: String, cTime: Long)

class RecordTimestampAssigner extends TimestampAssigner[UserClick] {

  override def extractTimestamp(element: UserClick, recordTimestamp: Long): Long = {

    element.cTime

  }

}

class PeriodWatermarkGenerator extends WatermarkGenerator[UserClick] {
  var maxTimestamp: Long = _
  val maxOutofOrderness = 5

  // 1. 根据某个特殊的event emit watermark
  override def onEvent(event: UserClick, eventTimestamp: Long, output: WatermarkOutput): Unit = {

    maxTimestamp = math.max(event.cTime, maxTimestamp)

  }

  // 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {

    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
  }
}


class MyWatermarkStrategy extends WatermarkStrategy[UserClick] {

  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[UserClick] = {

    new RecordTimestampAssigner()
  }

  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserClick] = {
    new PeriodWatermarkGenerator()

  }

}


object flink_test {

  def main(args: Array[String]): Unit = {

    val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val datastream1 = senv.fromElements(
      UserClick("zhang_san", "./home", fdf.parse("2022-01-27 07:58:18").getTime),
      UserClick("li_si", "./cart", fdf.parse("2022-01-27 07:59:36").getTime)
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy())

    println(datastream1.dataType)
    datastream1.print()

    val table1 = tEnv.fromDataStream(
      datastream1,
      Schema.newBuilder()
        // datastream的列要么不在schema中定义, 要么就全部定义, 定义时可以改变列的类型
        .column("name", DataTypes.STRING())
        .column("url", DataTypes.STRING())
        .column("cTime", DataTypes.BIGINT())
        // 新增列
        .columnByExpression("new_cTime", "to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss'))")
        .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
        .watermark("rowtime", "SOURCE_WATERMARK()")
        .build()
    )
    table1.printSchema()
    table1.execute().print()

    senv.execute()

  }
}

执行结果如下:

UserClick(name: String, url: String, cTime: Long)
(
  `name` STRING,
  `url` STRING,
  `cTime` BIGINT,
  `new_cTime` TIMESTAMP(3) AS to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss')),
  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |                           name |                            url |                cTime |               new_cTime |                 rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 | 2022-01-27 07:58:18.000 |
| +I |                          li_si |                         ./cart |        1643241576000 | 2022-01-27 07:59:36.000 | 2022-01-27 07:59:36.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
2 rows in set
1> UserClick(zhang_san,./home,1643241498000)
2> UserClick(li_si,./cart,1643241576000)

说明如下:

  • 只支持insert-only形式的datastream, 其它形式的datastream也会当作insert-only来处理
  • table自动获取datastream的列名和字段类型
  • 如果datastream有timestamp和watermark, 则可以通过Schema.columnByMetadata(“rowtime”, DataTypes.TIMESTAMP_LTZ(3))将datastream的timestamp形成一个普通列,可以通过Schema.watermark(“rowtime”, “SOURCE_WATERMARK()”), 根据datastream的watermark形成table的watermark, 此时rowtime列自动变成timestamp列。watermark的生成sqlExpression也可以是"rowtime - interval ‘10’ second"

2. tEnv.toDataStream(table[, DataTypes/class])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.tableConversions
table.toDataStream([DataTypes/class])

示例程序:

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, TableDescriptor}

/*
    val datetime1:java.time.Instant = java.time.Instant.ofEpochMilli(3000L)
    println(datetime1)      // 1970-01-01T00:00:03Z
    val timestamp1:Long = datetime1.toEpochMilli
    println(timestamp1)     // 3000
*/
case class User(name: String, score: Double, dataTime: java.time.Instant)

object flink_test {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.createTemporaryTable("default_catalog.default_database.dataSource",
      TableDescriptor.forConnector("datagen")
        .schema(Schema.newBuilder()
          .column("name", DataTypes.STRING())
          .column("score", DataTypes.DOUBLE())
          .column("dataTime", DataTypes.TIMESTAMP_LTZ(3))
          .watermark("dataTime", "dataTime - interval '10' second")
          .build())
        .option("rows-per-second", "1") // 每个slot每秒产生的数据量
        .build()
    )

    // tEnv.from(TableDescriptor)
    val dataSourceTable = tEnv.from("dataSource")
    dataSourceTable.printSchema()
    // tEnv.executeSql("select * from dataSource").print()

    // tEnv.toDataStream(dataSourceTable, classOf[User])
    // tEnv.toDataStream(dataSourceTable, DataTypes.of(classOf[User]))
    val dataSourceDatastream: DataStream[User] = tEnv.toDataStream(dataSourceTable,
      // 默认是Row<String, Double, Timestamp_LTZ>类型。使用此方式可以改变列的类型
      DataTypes.STRUCTURED(
        classOf[User],
        DataTypes.FIELD("name", DataTypes.STRING()),
        DataTypes.FIELD("score", DataTypes.DOUBLE()),
        DataTypes.FIELD("dataTime", DataTypes.TIMESTAMP_LTZ(3))
      )
    )
    print(dataSourceDatastream.dataType)
    dataSourceDatastream.executeAndCollect().foreach(println)


  }
}

输出结果如下:

(
  `name` STRING,
  `score` DOUBLE,
  `dataTime` TIMESTAMP_LTZ(3) *ROWTIME*,
  WATERMARK FOR `dataTime`: TIMESTAMP_LTZ(3) AS dataTime - interval '10' second
)
*User<`name` STRING, `score` DOUBLE, `dataTime` TIMESTAMP_LTZ(3)>*(User, org.apache.flink.table.runtime.typeutils.ExternalSerializer)User(d089222b0e15ce426897416fa0b23b2eb6a7d35dda0d8e23f61c673608b24faac09d3e2bc12b3d21aa6c018b0cf51ef0f980,1.6976293338728425E308,2022-01-27T04:30:47.009Z)
User(379a52cd0330f35856622d68b9c86be979de32a20ab3401bc377389a59494b325b50c9cb0186e7ac27170fa7c86c9f3ed0d1,5.483394273040764E307,2022-01-27T04:30:47.009Z)
User(c38f9276eec2c9f7d8bb8cba26665ff6bde5af68474e8ea65c6944f2c5fb457932fb0ab6f50172e2826880748b384c103241,4.605199108304596E307,2022-01-27T04:30:47.009Z)
......省略部分......

说明如下:

  • 只支持insert-only形式的table, 其它形式的直接报错
  • timestamp和watermark会自动传递给datastream

3. tEnv.fromChangelogStream(datastream[, Schema[, ChangelogMode]])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toChangelogTable(tEnv[, Schema[, ChangelogMode]])

示例程序

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}


class RecordTimestampAssigner extends TimestampAssigner[Row] {

  override def extractTimestamp(element: Row, recordTimestamp: Long): Long = {

    element.getFieldAs[Long]("f2")

  }

}

class PeriodWatermarkGenerator extends WatermarkGenerator[Row] {
  var maxTimestamp: Long = _
  val maxOutofOrderness = 5

  // 1. 根据某个特殊的event emit watermark
  override def onEvent(event: Row, eventTimestamp: Long, output: WatermarkOutput): Unit = {

    maxTimestamp = math.max(event.getFieldAs[Long]("f2"), maxTimestamp)

  }

  // 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {

    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
  }
}


class MyWatermarkStrategy extends WatermarkStrategy[Row] {

  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Row] = {

    new RecordTimestampAssigner()
  }

  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Row] = {
    new PeriodWatermarkGenerator()

  }

}


object flink_test {

  def main(args: Array[String]): Unit = {

    val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    // 有RowKind.UPDATE_BEFORE和RowKind.UPDATE_AFTER, 则为retract形式的changelog stream
    // 有RowKind.UPDATE_AFTER, 则为upsert形式的changelog stream
    val datastream1 = senv.fromElements(
      Row.ofKind(RowKind.INSERT, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 07:58:18").getTime)),
      Row.ofKind(RowKind.INSERT, "li_si", "./cart", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),
      Row.ofKind(RowKind.UPDATE_AFTER, "li_si", "./cart2", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),
      Row.ofKind(RowKind.DELETE, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 12:59:36").getTime))
    )(Types.ROW(Types.STRING, Types.STRING, Types.LONG))
      .assignTimestampsAndWatermarks(new MyWatermarkStrategy())

    println(datastream1.dataType)
    datastream1.print()

    val table1 = tEnv.fromChangelogStream(datastream1,
      Schema.newBuilder()
        .primaryKey("f0", "f1")
        .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
        .build(),
      ChangelogMode.upsert()
    )
    table1.printSchema()
    table1.execute().print()

    senv.execute()

  }
}

结果如下:

Row(f0: String, f1: String, f2: Long)
(
  `f0` STRING NOT NULL,
  `f1` STRING NOT NULL,
  `f2` BIGINT,
  `rowtime` TIMESTAMP_LTZ(3) METADATA,
  CONSTRAINT `PK_f0_f1` PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| op |                             f0 |                             f1 |                   f2 |                 rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| +I |                          li_si |                         ./cart |        1643241576000 | 2022-01-27 07:59:36.000 |
| +I |                          li_si |                        ./cart2 |        1643241576000 | 2022-01-27 07:59:36.000 |
| +I |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 |
| -D |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
4 rows in set
1> -D[zhang_san, ./home, 1643259576000]
6> +I[zhang_san, ./home, 1643241498000]
7> +I[li_si, ./cart, 1643241576000]
8> +U[li_si, ./cart2, 1643241576000]

说明如下:

  • datastream的类型必须为Row类型, 且该Row含有RowKind
  • timestamp和watermark默认不会被传递
  • ChangelogMode默认是retract(all), 用于指定datastream中的数据形式
  • ChangelogMode.upsert()必须指定primary key, 此时datastream中的RowKind.UPDATE_AFTER变成了table中的insert

4. tEnv.toChangelogStream(table[, Schema[, ChangelogMode]])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.tableConversions
table.toChangelogStream([Schema[, ChangelogMode]])

import org.apache.flink.table.api.bridge.scala.tableToChangelogDataStream
val datastream:DataStream[Row] = table

示例程序

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, long2Literal, row, string2Literal}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.Row
import scala.collection.JavaConversions.asScalaIterator


object flink_test {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)
    

    val table1 = tEnv.fromValues(
      row("zhang_san", 10, "2022-01-27 15:00:06"),
      row("li_si", 100, "2022-01-27 16:00:06"),
      row("zhang_san", 20, "2022-01-27 15:00:16"),
      row("zhang_san", 30, "2022-01-27 15:00:36"),
      row("li_si", 200, "2022-01-27 16:00:56")
    ).as("name", "amount", "dataTime")

    table1.printSchema()
    table1.execute().collect().foreach(println)

    val resultTable = table1
      .groupBy($("name"))
      .select($("name"), $("amount").sum().as("amount"))

    resultTable.printSchema()
    resultTable.execute().print()


    val resultDatastream: DataStream[Row] = tEnv.toChangelogStream(
      resultTable,
      Schema.newBuilder()
        .column("name", DataTypes.STRING().bridgedTo(classOf[String]))
        .column("amount", DataTypes.BIGINT().bridgedTo(classOf[java.lang.Long]))
        .build(),
      ChangelogMode.all()
    )
    print(resultDatastream.dataType)
    resultDatastream.print()

    resultDatastream.executeAndCollect()
      .foreach(
        row => {
          println(row.getKind)
          println(row.getFieldNames(true))
        }
      )
    
  }
}

结果如下:

(
  `name` VARCHAR(9) NOT NULL,
  `amount` BIGINT NOT NULL,
  `dataTime` CHAR(19) NOT NULL
)
+I[zhang_san, 10, 2022-01-27 15:00:06]
+I[li_si, 100, 2022-01-27 16:00:06]
+I[zhang_san, 20, 2022-01-27 15:00:16]
+I[zhang_san, 30, 2022-01-27 15:00:36]
+I[li_si, 200, 2022-01-27 16:00:56]
(
  `name` VARCHAR(9) NOT NULL,
  `amount` BIGINT NOT NULL
)
+----+--------------------------------+----------------------+
| op |                           name |               amount |
+----+--------------------------------+----------------------+
| +I |                      zhang_san |                   10 |
| -U |                      zhang_san |                   10 |
| +U |                      zhang_san |                   30 |
| -U |                      zhang_san |                   30 |
| +U |                      zhang_san |                   60 |
| +I |                          li_si |                  100 |
| -U |                          li_si |                  100 |
| +U |                          li_si |                  300 |
+----+--------------------------------+----------------------+
8 rows in set
ROW<`name` STRING, `amount` BIGINT> NOT NULL(org.apache.flink.types.Row, org.apache.flink.table.runtime.typeutils.ExternalSerializer)2> +I[zhang_san, 10]
4> +I[li_si, 100]
4> -U[li_si, 100]
2> -U[zhang_san, 10]
4> +U[li_si, 300]
2> +U[zhang_san, 30]
2> -U[zhang_san, 30]
2> +U[zhang_san, 60]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]

说明:

  • 会将table的timestamp和watermark传递给datastream
  • ChangelogMode用于指定table中数据的形式, 在本案例中table的流数据为retract形式, 如果指定ChangelogMode.insertOnly()则会报错,
    如果指定ChangelogMode.upsert()则datastream中会少-U类型的数据

5. StatementSet.attachAsDataStream()

将Table和SQL的多个insert操作,转换成DataStream,同时清除statementSet,最后通过senv.execute()进行触发

val statementSet = tEnv.createStatementSet()

statementSet.addInsert(......)
statementSet .addInsertSql(......)

statementSet.attachAsDataStream()

senv.execute()
 类似资料: