当前位置: 首页 > 工具软件 > Sliding Table > 使用案例 >

Flink - Table API 之 window (窗口)

轩辕晔
2023-12-01

窗口

  • 时间语义,要配合窗口操作才能发挥作用
  • 在Table API 和 SQL 中,主要有两种窗口

Group Window(分组窗口)

  • 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

Over Windows

  • 针对每个输入行,计算相邻范围内的聚合

Group Windows

  • Group Windows是使用window(w:GroupWindows)子句定义的,并且必须由as子句指定一个别名

  • 为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用

    Table table = input
    	.window(w:GroupWindow] as "w") //定义窗口,别名为"w"
    	.groupBy("w","a")  // 按照字段a和窗口w分组
    	.select("a,b.sum"); // 聚合
    
  • Table API 提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作

滚动窗口(Tumbling windows)

  • 滚动窗口要用Tunble类来定义

    //Tumbling Event-time Window
    .window(Tumble.over("10.minutes").on("rowtime").as("w"))
    
    // Tumbling Processing-tiem Window
    .window(Tumble.over("10.minutes").on("proctime").as("w"))
    
    // Tumbling Row-count Window
    .window(Tumble.over("10.rows").on("proctime").as("w"))
    

滑动窗口(Sliding windows)

  • 滑动窗口要用Slide类来定义

    //Sliding Event-time Window
    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
    
    // Sliding Processing-tiem Window
    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
    
    // Sliding Row-count Window
    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
    

会话窗口 (Session Window)

  • 会话窗口要用Session类来定义

    // Session Event-time window
    .window(Session.withGap("10.minutes").on("rowtime").as("w"))
    
    // Session Processing-time window
    .window(Session.withGap("10.minutes").on("proctime").as("w"))
    

SQL中的Group Windows

  • Group Windows定义在SQL查询的Group By子句中

    TUMBLE(time_attr,interval)

    • 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口字段

    HOP(time_attr,interval,interval)

    • 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三是窗口长度

    SESSION(time_attr,interval)

    • 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔。
  • 案例代码

    public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream<String> fileDataStream = env.readTextFile("data/temps.txt");
    
    
            DataStream<TempInfo> dataStream = fileDataStream.map(new MapFunction<String, TempInfo>() {
                @Override
                public TempInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new TempInfo(split[0], new Long(split[1]), new Double(split[2]));
                }
            }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TempInfo>(Time.seconds(1)) {
                @Override
                public long extractTimestamp(TempInfo element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, pt.proctime");
    
            //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, rt.rowtime");
            Table dataTable = tableEnv.fromDataStream(dataStream,"id, timeStamp as ts, temp, rt.rowtime");
            
            // 构建视图
            tableEnv.createTemporaryView("sensor",dataTable);
    
    
            // 窗口操作
            // 1. Group Window
            // table API
            Table resTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                    .groupBy("id,tw")
                    .select("id, id.count, temp.avg");
    
            // SQL
           Table resSqlTable = tableEnv.sqlQuery("SELECT id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt,interval '10' second)" +
                    "from sensor group by id, tumble(rt,interval '10' second) ");
    
    
    
            // 2. Over Window
            // table API
            Table overTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                    .select("id, rt, id.count over ow,temp.avg over ow");
    
            // SQL
            tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow "+
                    " from sensor "+
                    " window ow as (partition by id order by rt rows between 2 preceding and current row)");
    
    
    
            tableEnv.toAppendStream(resTable, Row.class).print("group window");
            tableEnv.toAppendStream(resSqlTable, Row.class).print("resSqlTable");
            tableEnv.toRetractStream(overTable,Row.class).print("over window");
    
            env.execute();
        }
    

Over Windows

  • Over window聚合是标准SQL中已有的(over子句),可以在查询的SELECT 子句中定义

  • Over window聚合,会针对每个输入行,计算相邻行范围内的聚合

  • Over windows使用window(w:overwindows*)子句定义,并在select()方法中通过别名来引用

    Table table = input
    	.window([w:OverWindow] as "w")
    	.select("a, b.sum over w, c.min over w");
    
  • Table API 提供了Over类,来配置Over窗口的属性

无界 Over Windows

  • 可以在事件时间或处理时间,以及指定为时间间隔,或行计数的范围内,定义Over windows
  • 无界的over windows 是使用常量指定的
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))

// 无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))

// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))

// 无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))

有界 Over Windows

  • 有界的over window 是用间隔的大小指定的
// 有界的事件时间over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))

// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))

// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))

// 有届的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))

SQL中的Over Windows

  • 用Over做窗口聚合时,所有聚合必须在同一个窗口上定义,也就是说必须是相同的分区,排序和范围

  • 目前仅支持在当前行范围之前的窗口

  • ORDER BY 必须在单一的时间属性上指定

    SELECT COUNT(amount) OVER(
    	PARTITION BY user
    	ORDER BY proctime
    	ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
    FROM Orders
    
 类似资料: