2、利用 Sql 定义 Source 数据源(并在数据源中定义Watermark和event-time),连接Source
3、利用滑动窗口Hop测试Watermark和event-time是否定义成功
4、利用Window TopN实现业务逻辑
package org.example.hot.items
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object HotItemsWithSqlNew {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputTable =
"""
|CREATE TABLE input_table (
| user_id BIGINT
| ,item_id BIGINT
| ,category_id INT
| ,behavior VARCHAR(11)
| ,action_time BIGINT
| ,ts as TO_TIMESTAMP(FROM_UNIXTIME(action_time, 'yyyy-MM-dd HH:mm:ss'))
| ,WATERMARK FOR ts as ts - INTERVAL '1' SECOND
|) WITH (
| 'connector' = 'filesystem'
| ,'path' = 'F:\\workspace\\idea\\flink_study_core\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources'
| ,'format' = 'csv'
|)
|""".stripMargin
tableEnv.executeSql(inputTable)
// 方式一:直接通过一条SQL进行业务逻辑实现
// 利用滑动窗口HOT定义Watermark和event-time
val resultTableStr =
"""
|select *
|from (
| select *
| ,row_number() over (partition by windowEnd order by cnt desc) as row_num
| from (
| SELECT item_id
| ,HOP_END(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS) windowEnd
| ,COUNT(item_id) as cnt
| FROM input_table
| WHERE behavior = 'pv'
| GROUP BY
| item_id
| , HOP(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS)
| ) AS tmp_agg)
|where row_num <= 10
|""".stripMargin
val resultTable = tableEnv.sqlQuery(resultTableStr)
resultTable.toRetractStream[Row].print("result")
// 方式二:分步实现业务逻辑(通过创建中间视图VIEW方式)
// 利用滑动窗口HOT定义Watermark和event-time
val aggTable =
"""
|CREATE VIEW agg_table
|SELECT item_id
| ,HOP_END(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS) windowEnd
| ,COUNT(item_id) as cnt
|FROM input_table
|WHERE behavior = 'pv'
|GROUP BY
| item_id
| , HOP(ts, INTERVAL '1' SECONDS, INTERVAL '2' SECONDS)
|
|""".stripMargin
tableEnv.sqlQuery(aggTable)
val tableStr =
"""
|select *
|from (
| select *
| ,row_number() over (partition by windowEnd order by cnt desc) as row_num
| from agg_table
|where row_num <= 10
|""".stripMargin
val table = tableEnv.sqlQuery(tableStr)
table.toRetractStream[Row].print("table")
env.execute("HotItemsWithSqlNew")
}
}
注:
1、定义事件时间event-time语义注意数据类型的格式转换操作,一般Source数据源中的时间是以时间戳的形式保存,可以经过这种方式进行转换为TIMESTAMP(3)类型。
TO_TIMESTAMP(FROM_UNIXTIME(action_time, 'yyyy-MM-dd HH:mm:ss'))