Flink Sql 中 Watermark 使用方式

卫昊东
2023-12-01

1、创建 envtableEnv 环境

2、利用 Sql 定义 Source 数据源(并在数据源中定义Watermark和event-time),连接Source

3、利用滑动窗口Hop测试Watermark和event-time是否定义成功

4、利用Window TopN实现业务逻辑

package org.example.hot.items

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object HotItemsWithSqlNew {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    val inputTable =
      """
        |CREATE TABLE input_table (
        | user_id BIGINT
        | ,item_id BIGINT
        | ,category_id INT
        | ,behavior VARCHAR(11)
        | ,action_time BIGINT
        | ,ts as TO_TIMESTAMP(FROM_UNIXTIME(action_time, 'yyyy-MM-dd HH:mm:ss'))
        | ,WATERMARK FOR ts as ts - INTERVAL '1' SECOND
        |) WITH (
        | 'connector' = 'filesystem'
        | ,'path' = 'F:\\workspace\\idea\\flink_study_core\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources'
        | ,'format' = 'csv'
        |)
        |""".stripMargin

    tableEnv.executeSql(inputTable)

    // 方式一:直接通过一条SQL进行业务逻辑实现
    // 利用滑动窗口HOT定义Watermark和event-time
    val resultTableStr =
      """
        |select *
        |from (
        | select *
        |   ,row_number() over (partition by windowEnd order by cnt desc) as row_num
        | from (
        |   SELECT item_id
        |     ,HOP_END(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS) windowEnd
        |       ,COUNT(item_id) as cnt
        |   FROM input_table
        |   WHERE behavior = 'pv'
        |   GROUP BY
        |       item_id
        |       , HOP(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS)
        |   ) AS tmp_agg)
        |where row_num <= 10
        |""".stripMargin

    val resultTable = tableEnv.sqlQuery(resultTableStr)
    resultTable.toRetractStream[Row].print("result")

    // 方式二:分步实现业务逻辑(通过创建中间视图VIEW方式)
    // 利用滑动窗口HOT定义Watermark和event-time
    val aggTable =
      """
        |CREATE VIEW agg_table
        |SELECT item_id
        |  ,HOP_END(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS) windowEnd
        |  ,COUNT(item_id) as cnt
        |FROM input_table
        |WHERE behavior = 'pv'
        |GROUP BY
        |   item_id
        |   , HOP(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS)
        |
        |""".stripMargin

    tableEnv.sqlQuery(aggTable)

    val tableStr =
      """
        |select *
        |from (
        | select *
        |   ,row_number() over (partition by windowEnd order by cnt desc) as row_num
        | from agg_table
        |where row_num <= 10
        |""".stripMargin

    val table = tableEnv.sqlQuery(tableStr)
    table.toRetractStream[Row].print("table")

    env.execute("HotItemsWithSqlNew")
  }
}

注:

1、定义事件时间event-time语义注意数据类型的格式转换操作,一般Source数据源中的时间是以时间戳的形式保存,可以经过这种方式进行转换为TIMESTAMP(3)类型。

TO_TIMESTAMP(FROM_UNIXTIME(action_time, 'yyyy-MM-dd HH:mm:ss'))

 类似资料: