基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息
Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用
时间属性的行为类似于常规时间戳,可以访问,并且进行计算
处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark
由 DataStream 转换成表时指定
在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段
这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它
最简单最直观的方法。
// 将流转成表,定义时间特性
Table dataTable = tableEnv.fromDataStream(dataStream,
"id, timestamp as ts, temperature as temp, pt.proctime");
dataTable.printSchema();
// root
// |-- id: STRING
// |-- ts: BIGINT
// |-- temp: DOUBLE
// |-- pt: TIMESTAMP(3) *PROCTIME*
tableEnv.toAppendStream(dataTable, Row.class).print("dataTable");
// dataTable> sensor_1,1547718199,35.8,2021-09-03 03:25:44.725
// dataTable> sensor_6,1547718201,15.4,2021-09-03 03:25:44.734
// dataTable> sensor_7,1547718202,6.7,2021-09-03 03:25:44.734
// dataTable> sensor_10,1547718205,38.1,2021-09-03 03:25:44.734
// dataTable> sensor_1,1547710000,36.8,2021-09-03 03:25:44.735
// dataTable> sensor_1,1547719999,34.8,2021-09-03 03:25:44.735
// dataTable> sensor_1,1547715555,37.8,2021-09-03 03:25:44.735
定义 Table Schema 时指定
连接外部系统的时候,可以用这种方法。
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
.field("pt", DataTypes.TIMESTAMP(3))
.proctime()
)
在创建表的 DDL 中定义
要用这个方法的话需要用blinn。
String sinkDDL =
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" pt AS PROCTIME() " +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);
正常情况下,都是使用事件时间的。
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展
在定义事件时间时,需要先将当前环境设为时间事件
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
定义事件时间,同样有三种方法:
由 DataStream 转换成表时指定
在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性,最常见和最推荐的方法。
// 从文件中读取数据,得到一个DataStream
String path = "/home/lxj/workspace/Flink/src/main/resources/sensor.txt";
DataStream<String> inputDataStream = env.readTextFile(path);
// 转换成POJO
DataStream<SensorReading> dataStream = inputDataStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
// 声明WM以及WM的延迟时间
@Override
public long extractTimestamp(SensorReading sensorReading) {
return sensorReading.getTimestamp() * 1000L;
}
});
// 将流转成表,定义时间特性
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature as temp");
dataTable.printSchema();
// root
// |-- id: STRING
// |-- ts: TIMESTAMP(3) *ROWTIME*
// |-- temp: DOUBLE
tableEnv.toAppendStream(dataTable, Row.class).print("dataTable");
// dataTable> sensor_1,2019-01-17 09:43:19.0,35.8
// dataTable> sensor_6,2019-01-17 09:43:21.0,15.4
// dataTable> sensor_7,2019-01-17 09:43:22.0,6.7
// dataTable> sensor_10,2019-01-17 09:43:25.0,38.1
// dataTable> sensor_1,2019-01-17 07:26:40.0,36.8
// dataTable> sensor_1,2019-01-17 10:13:19.0,34.8
// dataTable> sensor_1,2019-01-17 08:59:15.0,37.8
Table dataTable2 = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");
dataTable2.printSchema();
// root
// |-- id: STRING
// |-- ts: BIGINT
// |-- temp: DOUBLE
// |-- rt: TIMESTAMP(3) *ROWTIME*
tableEnv.toAppendStream(dataTable2, Row.class).print("dataTable2");
// dataTable2> sensor_1,1547718199,35.8,2019-01-17 09:43:19.0
// dataTable2> sensor_6,1547718201,15.4,2019-01-17 09:43:21.0
// dataTable2> sensor_7,1547718202,6.7,2019-01-17 09:43:22.0
// dataTable2> sensor_10,1547718205,38.1,2019-01-17 09:43:25.0
// dataTable2> sensor_1,1547710000,36.8,2019-01-17 07:26:40.0
// dataTable2> sensor_1,1547719999,34.8,2019-01-17 10:13:19.0
// dataTable2> sensor_1,1547715555,37.8,2019-01-17 08:59:15.0
定义 Table Schema 时指定
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.rowtime(
new Rowtime()
// 从字段中提取时间戳
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(1000)
// watermark 延迟 1 秒
)
.field("temperature", DataTypes.DOUBLE())
)
在创建表的 DDL 中定义
String sinkDDL=
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
" watermark for rt as rt - interval '1' second" +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);
时间语义,要配合窗口操作才能发挥作用
在 Table API 和 SQL 中,主要有两种窗口
Group Windows(分组窗口) 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
Over Windows 针对每个输入行,计算相邻行范围内的聚合,开窗函数
Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。
为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用
Table table = input
.window([w: GroupWindow] as "w") // 定义窗口,别名为 w
.groupBy("w, a") // 按照字段 a 和窗口 w 分组
.select("a, b.sum"); // 聚合
Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作
滚动窗口要用 Tumble 类来定义
// Tumbling Event-time Window-滚动事件时间窗口
// 第一个参数为时间/计数,第二个为时间类型对应的字段(自定义的字段名),第三个为别名
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-time Window-滚动处理时间窗口
.window(Tumble.over(" 10.minutes ").on("proctime ").as("w"))
// Tumbling Row-count Window-滚动计数窗口
.window(Tumble.over(" 10.rows ").on("proctime ").as("w"))
滑动窗口要用 Slide 类来定义
// Sliding Event-time Window-滑动事件时间窗口
// every为滑动步长
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-time window-滑动处理时间窗口
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
// Sliding Row-count window-滑动计数窗口
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
会话窗口要用 Session 类来定义
// Session Event-time Window-事件时间
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window-处理时间
.window(Session.withGap("10.minutes").on("proctime").as("w"))
Group Windows 定义在 SQL 查询的 Group By 子句中
TUMBLE(time_attr, interval)
定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
HOP(time_attr, interval, interval)
定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是
窗口长度
SESSION(time_attr, interval)
定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
// 窗口操作
// Group Window
// TableAPI
Table resultTable = dataTable2
.window(Tumble.over("10.seconds").on("rt").as("tw"))
.groupBy("id, tw")
.select("id, id.count, temp.avg, tw.end");
tableEnv.toAppendStream(resultTable, Row.class).print("table");
// table> sensor_1,1,36.8,2019-01-17 07:26:50.0
// table> sensor_1,1,37.8,2019-01-17 08:59:20.0
// table> sensor_1,1,35.8,2019-01-17 09:43:20.0
// table> sensor_6,1,15.4,2019-01-17 09:43:30.0
// table> sensor_10,1,38.1,2019-01-17 09:43:30.0
// table> sensor_7,1,6.7,2019-01-17 09:43:30.0
// table> sensor_1,1,34.8,2019-01-17 10:13:20.0
// SQL实现
tableEnv.createTemporaryView("sensor", dataTable2);
Table sqlResultTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) from sensor group by id, tumble(rt, interval '10' second)");
tableEnv.toRetractStream(sqlResultTable, Row.class).print("sql");
// sql> (true,sensor_1,1,36.8,2019-01-17 07:26:50.0)
// sql> (true,sensor_1,1,37.8,2019-01-17 08:59:20.0)
// sql> (true,sensor_1,1,35.8,2019-01-17 09:43:20.0)
// sql> (true,sensor_6,1,15.4,2019-01-17 09:43:30.0)
// sql> (true,sensor_10,1,38.1,2019-01-17 09:43:30.0)
// sql> (true,sensor_7,1,6.7,2019-01-17 09:43:30.0)
// sql> (true,sensor_1,1,34.8,2019-01-17 10:13:20.0)
Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的SELECT 子句中定义
Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
Over windows 使用 window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用
Table table = input
.window([w: OverWindow] as "w")
.select("a, b.sum over w, c.min over w");
Table API 提供了 Over 类,来配置 Over 窗口的属性
可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows
无界的 over window 是使用常量指定的
// 无界的事件时间 over window,preceding可以省略,省略默认就是UNBOUNDED_RANGE
// 只能取当前行及之前的数据,不能取之后的数据
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
// 无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
有界的 over window 是用间隔的大小指定的
// 有界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("procime").preceding("10.rows").as("w"))
用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
目前仅支持在当前行范围之前的窗口
ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// Over Window
// TableAPI
Table overResult = dataTable2.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
.select("id, rt, id.count over ow, temp.avg over ow");
Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow from sensor window ow as (partition by id order by rt rows between 2 preceding and current row)");
tableEnv.toAppendStream(overResult, Row.class).print("table");
// table> sensor_1,2019-01-17 07:26:40.0,1,36.8
// table> sensor_1,2019-01-17 08:59:15.0,2,37.3
// table> sensor_1,2019-01-17 09:43:19.0,3,36.8
// table> sensor_6,2019-01-17 09:43:21.0,1,15.4
// table> sensor_7,2019-01-17 09:43:22.0,1,6.7
// table> sensor_10,2019-01-17 09:43:25.0,1,38.1
// table> sensor_1,2019-01-17 10:13:19.0,3,36.13333333333333
tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");
// sql> (true,sensor_1,2019-01-17 07:26:40.0,1,36.8)
// sql> (true,sensor_1,2019-01-17 08:59:15.0,2,37.3)
// sql> (true,sensor_1,2019-01-17 09:43:19.0,3,36.8)
// sql> (true,sensor_6,2019-01-17 09:43:21.0,1,15.4)
// sql> (true,sensor_7,2019-01-17 09:43:22.0,1,6.7)
// sql> (true,sensor_10,2019-01-17 09:43:25.0,1,38.1)
// sql> (true,sensor_1,2019-01-17 10:13:19.0,3,36.13333333333333)