要想执行 Table API 和 SQL 的话,还需要 “表环境”(TableEnvironment)。它主要负责:
如下四种创建表环境方法:
// **********************
// 方式一:FLINK STREAMING QUERY
// **********************
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// 方式二:FLINK BATCH QUERY
// ******************
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// 方式三:BLINK STREAMING QUERY
// **********************
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// 方式四:BLINK BATCH QUERY
// ******************
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
// TEMPORARY : 可选,加上表示临时表,不加则是永久表
// connector : 连接外部数据源
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
连接Kafka示例:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
// 创建虚拟表,newTable表创建成虚拟表,名称为:NewTable
tableEnv.createTemporaryView("NewTable", newTable);
Table table = tableEnvironment.sqlQuery("select * from myTabel");
Table table = ...
// 使用 Table API 数据进行查询
Table tableApi = table
.select($("name"), $("time"))
.where($("name").isEqual("fzk"));
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
// 方式一:传入流数据(DataStream)
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1);
// 方式二:第一个参数:传入流数据(DataStream),第二个参数:表字段
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1, Expression... var2);
// 示例:
Table table = tableEnvironment.fromDataStream(sourceData);
Table table = tableEnvironment.fromDataStream(sourceData, $("f0").as("name"), $("f1").as("time"));
//
Table table = tableEnvironment.fromChangelogStream(sourceData);
// 创建临时表,第一个参数:表名,第二个参数:流数据,第三个及以后的参数:表字段
tableEnvironment.createTemporaryView("myTabel", sourceData, $("f0").as("name"), $("f1").as("time"));
DataStream<Row> rowDataStream = tableEnvironment.toDataStream(table);
DataStream<Row> rowDataStream = tableEnvironment.toChangelogStream(table);
Data Type | Remarks for Data Type |
---|---|
CHAR | |
VARCHAR | |
STRING | |
BOOLEAN | |
BYTES | BINARY and VARBINARY are not supported yet. |
DECIMAL | Supports fixed precision and scale. |
TINYINT | |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DATE | |
TIME | Supports only a precision of 0 . |
TIMESTAMP | |
TIMESTAMP_LTZ | |
INTERVAL | Supports only interval of MONTH and SECOND(3) . |
ARRAY | |
MULTISET | |
MAP | |
ROW | |
RAW | |
structured types | Only exposed in user-defined functions yet. |
Tuple | |
POJO |
TIMESTAMP 格式的时间:直接使用
# WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
# WATERMARK FOR 「事件时间字段名」 AS 「事件时间字段名」 - INTERVAL '「时长」' 「时间单位」
# 设置「5秒」水位线延迟
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
长整型的时间格式的时间:需要先转成 TIMESTAMP_LTZ 格式数据
# WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
# WATERMARK FOR 「时间字段名」 AS 「时间字段名」 - INTERVAL '「时长」' 「时间单位」
# 设置「5秒」水位线延迟
# ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3):将 长整型 格式的转成 TIMESTAMP_LTZ 格式
CREATE TABLE events (
user STRING,
url STRING,
ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
// 方法一:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// 方法二:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
# ts AS PROCTIME() 中的 PROCTIME 方法为获取当前处理时间
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);
DataStream<Tuple2<String, String>> stream = ...;
// proctime 方法来获取当前处理时间
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());
// Tumbling Event-time Window
table.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));
// Tumbling Processing-time Window (假设 processing-time 字段为 "proctime")
table.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));
// Tumbling Row-count(行间隔窗口) Window (假设 processing-time 字段为 "proctime")
table.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));
// Sliding Event-time Window
table.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("rowtime"))
.as("w"));
// Sliding Processing-time window (假设 processing-time 字段为 "proctime")
table.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("proctime"))
.as("w"));
// Sliding Row-count window(行间隔窗口) (假设 processing-time 字段为 "proctime")
table.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
// Session Event-time Window
table.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));
// Session Processing-time Window (假设 processing-time 字段为 "proctime")
table.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));
-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
| bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 | 4.00 | C |
| 2020-04-15 08:07 | 2.00 | A |
| 2020-04-15 08:09 | 5.00 | D |
| 2020-04-15 08:11 | 3.00 | B |
| 2020-04-15 08:13 | 1.00 | E |
| 2020-04-15 08:17 | 6.00 | F |
+------------------+-------+------+
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 窗口时间为10分钟的滚动窗口, Bid:表名,bidtime:时间字段,INTERVAL '10' MINUTES:窗口时间10分钟
SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 窗口时间为10分钟,步长5分钟
SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- 窗口时间为10分钟,步长2分钟
SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
);
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
分组聚合就是我们通常SQL写的 group by 语句,常用的聚合函数 SUM()、MAX()、MIN()、AVG()以及 COUNT(),属于多对一的新式
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
通过开窗的字段值(window_start、window_end、window_time)进行聚合
-- 按照窗口的开始时间和结束时间进行 窗口时长为10分钟的窗口 进行聚合
SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”
开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系
用法:
SELECT
<聚合函数> OVER (
[PARTITION BY <字段 1>[, <字段 2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
FROM ...
PARTITION BY:
ORDER BY:
开窗范围:对于开窗函数而言,必须指定开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围,分为:范围间隔(RANGE intervals)和行间隔(ROW intervals)
范围间隔(RANGE intervals)
范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
-- 示例:
SELECT user, ts,
COUNT(url) OVER (
PARTITION BY user
ORDER BY ts
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS cnt
FROM EventTable
行间隔(ROW intervals)
行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
-- 示例:
SELECT user, ts,
COUNT(url) OVER (
PARTITION BY user
ORDER BY ts
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS cnt
FROM EventTable
把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)
当内置函数中的功能满足不了我们的实际业务需求时,我们就需要自己来编写UDF函数来实现业务场景,下面是整体的使用流程:
// 1. 编写自定义函数。。。略
// 2. 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
// 3.1. 使用 Table API 调用函数,需要使用 call()方法来调用自定义函数,第一个参数:函数名,后面的参数:函数所需的参数值
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
// 3.2. 在 SQL 中调用函数
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
把 0 到多个标量值映射成 1 个标量值
// 1. 编写标量函数
// 1、继承 ScalarFunction
// 2、求值方法必须是 public 的,而且名字必须是 eval
public static class HashFunction extends ScalarFunction {
// 接受任意类型输入,返回 INT 型输出
// DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示 eval 的参数可以是任意类型
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
// 2. 注册函数
env.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 3.1 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("HashFunction", $("myField")));
// 3.2 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
把 0 到多个标量值映射成一个表(一个表:n行n列)
编写
使用
.joinLateral(...)
或者 .leftOuterJoinLateral(...)
来使用的。
joinLateral
算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral
算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。JOIN
或者 以 ON TRUE
为条件的 LEFT JOIN
来配合 LATERAL TABLE(<TableFunction>)
的使用// 1. 编写表函数
// 1、继承 TableFunction
// 2、求值方法必须是 public 的,而且名字必须是 eval
// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
// 使用 collect()方法发送一行数据
collect(Row.of(s, s.length()));
}
}
}
// 2. 注册函数
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// 3.1 在 Table API 里调用注册好的函数
env
.from("MyTable")
.joinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));
env
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));
// 3.2 在 SQL 里调用注册好的函数
// 交叉连接
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
// 左连接
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
// 在 SQL 里重命名函数字段
env.sqlQuery(
"SELECT myField, newWord, newLength " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");
把 一个表转换成一个新的标量值(一个表:n行n列)
createAccumulator()
:创建一个空的 accumulatoraccumulate()
:对于每一行数据,会调用 accumulate()
方法来更新 accumulatorgetValue()
:当所有的数据都处理完了之后,通过调用 getValue
方法来计算和返回最终的结果retract()
:在 bounded OVER
窗口中是必须实现的,回退 accumulatormerge()
:在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的 AggregateFunction
都实现 merge
方法。resetAccumulator()
:在许多批式聚合中是必须实现的,重置 accumulator/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}
// 1. 编写聚合函数
// 继承 AggregateFunction 并 重写方法
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
// 创建一个空的 accumulator
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
// 返回最终的结果
@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
//更新 accumulator
public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}
// 回退 accumulator
public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}
// merge合并操作
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
// 重置 accumulator
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}
// 2. 注册函数
tEnv.registerFunction("wAvg", new WeightedAvg());
// 3. 使用函数
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
把 一个表 转换成 另一个表(一个表:n行n列)
createAccumulator()
:创建一个空的 accumulatoraccumulate()
:对于每一行数据,会调用 accumulate()
方法来更新 accumulatoremitValue()
:计算和返回最终的结果retract()
在 bounded OVER
窗口中的聚合函数必须要实现,回退 accumulatormerge()
在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的resetAccumulator()
在许多批式聚合中是必须要实现的,重置 accumulator/**
* Accumulator for Top2.
*/
public class Top2Accum {
public Integer first;
public Integer second;
}
// 1. 编写表聚合函数
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
// 创建一个空的 accumulator
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
// 更新 accumulator
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
// 合并操作
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
// 计算和返回最终的结果
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
// 2. 注册函数
tEnv.registerFunction("top2", new Top2());
// 3. 使用函数
tab.groupBy("key")
.flatAggregate("top2(a) as (v, rank)")
.select("key, v, rank");
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>