Group Window(分组窗口)
Over Windows
Group Windows是使用window(w:GroupWindows)子句定义的,并且必须由as子句指定一个别名
为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用
Table table = input
.window(w:GroupWindow] as "w") //定义窗口,别名为"w"
.groupBy("w","a") // 按照字段a和窗口w分组
.select("a,b.sum"); // 聚合
Table API 提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作
滚动窗口要用Tunble类来定义
//Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-tiem Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))
// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))
滑动窗口要用Slide类来定义
//Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-tiem Window
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
// Sliding Row-count Window
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
会话窗口要用Session类来定义
// Session Event-time window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time window
.window(Session.withGap("10.minutes").on("proctime").as("w"))
Group Windows定义在SQL查询的Group By子句中
TUMBLE(time_attr,interval)
HOP(time_attr,interval,interval)
SESSION(time_attr,interval)
案例代码
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> fileDataStream = env.readTextFile("data/temps.txt");
DataStream<TempInfo> dataStream = fileDataStream.map(new MapFunction<String, TempInfo>() {
@Override
public TempInfo map(String value) throws Exception {
String[] split = value.split(",");
return new TempInfo(split[0], new Long(split[1]), new Double(split[2]));
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TempInfo>(Time.seconds(1)) {
@Override
public long extractTimestamp(TempInfo element) {
return element.getTimeStamp() * 1000L;
}
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, pt.proctime");
//Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, rt.rowtime");
Table dataTable = tableEnv.fromDataStream(dataStream,"id, timeStamp as ts, temp, rt.rowtime");
// 构建视图
tableEnv.createTemporaryView("sensor",dataTable);
// 窗口操作
// 1. Group Window
// table API
Table resTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
.groupBy("id,tw")
.select("id, id.count, temp.avg");
// SQL
Table resSqlTable = tableEnv.sqlQuery("SELECT id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt,interval '10' second)" +
"from sensor group by id, tumble(rt,interval '10' second) ");
// 2. Over Window
// table API
Table overTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
.select("id, rt, id.count over ow,temp.avg over ow");
// SQL
tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow "+
" from sensor "+
" window ow as (partition by id order by rt rows between 2 preceding and current row)");
tableEnv.toAppendStream(resTable, Row.class).print("group window");
tableEnv.toAppendStream(resSqlTable, Row.class).print("resSqlTable");
tableEnv.toRetractStream(overTable,Row.class).print("over window");
env.execute();
}
Over window聚合是标准SQL中已有的(over子句),可以在查询的SELECT 子句中定义
Over window聚合,会针对每个输入行,计算相邻行范围内的聚合
Over windows使用window(w:overwindows*)子句定义,并在select()方法中通过别名来引用
Table table = input
.window([w:OverWindow] as "w")
.select("a, b.sum over w, c.min over w");
Table API 提供了Over类,来配置Over窗口的属性
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
// 无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
// 有界的事件时间over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有届的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
用Over做窗口聚合时,所有聚合必须在同一个窗口上定义,也就是说必须是相同的分区,排序和范围
目前仅支持在当前行范围之前的窗口
ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER(
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders