当前位置: 首页 > 工具软件 > Sliding Table > 使用案例 >

Flink-TableAPI&SQL快速上手

高嘉熙
2023-12-01

0. 程序流程

  1. 创建表环境
  2. 创建表,用于输入输出
    1. 创建输入表,连接外部系统读取数据
    2. 注册输出表,连接到外部系统,用于输出
  3. 对数据进行表的查询处理
    1. 执行 SQL 对表进行查询转换,得到一个新的表
    2. 使用 Table API 对表进行查询转换,得到一个新的表
  4. 将得到的结果写入输出表

1. 创建表环境

要想执行 Table API 和 SQL 的话,还需要 “表环境”(TableEnvironment)。它主要负责:

  1. 注册 Catalog 和表;
  2. 执行 SQL 查询;
  3. 注册用户自定义函数(UDF);
  4. DataStream 和表之间的转换

如下四种创建表环境方法:

// **********************
// 方式一:FLINK STREAMING QUERY
// **********************
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);



// ******************
// 方式二:FLINK BATCH QUERY
// ******************
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);



// **********************
// 方式三:BLINK STREAMING QUERY
// **********************
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);



// ******************
// 方式四:BLINK BATCH QUERY
// ******************
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2. 创建表

连接器表(Connector Tables)

Table API Connectors

// TEMPORARY : 可选,加上表示临时表,不加则是永久表
// connector : 连接外部数据源
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

连接Kafka示例:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

虚拟表(Virtual Tables)

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
// 创建虚拟表,newTable表创建成虚拟表,名称为:NewTable
tableEnv.createTemporaryView("NewTable", newTable);

3. 表的查询

执行 SQL 进行查询

  • Flink 基于 Apache Calcite 来提供对SQL 的支持,Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具,很多大数据框架比如 Apache Hive、Apache Kylin 中的 SQL 支持都是通过集成 Calcite 来实现的
Table table = tableEnvironment.sqlQuery("select * from myTabel");

调用 Table API 进行查询

Table table = ...

// 使用 Table API 数据进行查询
Table tableApi = table
                .select($("name"), $("time"))
                .where($("name").isEqual("fzk"));

4. 输出表

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 经过查询转换,得到结果表
Table result = ...
    
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

5. 表和流的相互转换

流转换成表(fromDataStream)

调用 fromDataStream 方法

  • ⚠️:新增调用 fromDataStream 方法即可,但 更新日志流转换成表调用 fromChangelogStream 方法
// 方式一:传入流数据(DataStream)
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1);

// 方式二:第一个参数:传入流数据(DataStream),第二个参数:表字段
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1, Expression... var2);

// 示例:
Table table = tableEnvironment.fromDataStream(sourceData);
Table table = tableEnvironment.fromDataStream(sourceData, $("f0").as("name"), $("f1").as("time"));

调用 fromChangelogStream ()方法

  • ⚠️:更新日志流转换成表调用 fromChangelogStream 方法
// 
Table table = tableEnvironment.fromChangelogStream(sourceData);

调用 createTemporaryView() 方法

// 创建临时表,第一个参数:表名,第二个参数:流数据,第三个及以后的参数:表字段
tableEnvironment.createTemporaryView("myTabel", sourceData, $("f0").as("name"), $("f1").as("time"));

表转换成流

调用 toDataStream() 方法

  • ⚠️:如果数据只是新增可以从此方法,如果有更新的操作则需要 调用 toChangelogStream() 方法
DataStream<Row> rowDataStream = tableEnvironment.toDataStream(table);

调用 toChangelogStream() 方法

  • ⚠️:有更新的操作则 调用 toChangelogStream() 方法
DataStream<Row> rowDataStream = tableEnvironment.toChangelogStream(table);

6. 数据格式

Data TypeRemarks for Data Type
CHAR
VARCHAR
STRING
BOOLEAN
BYTESBINARY and VARBINARY are not supported yet.
DECIMALSupports fixed precision and scale.
TINYINT
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DATE
TIMESupports only a precision of 0.
TIMESTAMP
TIMESTAMP_LTZ
INTERVALSupports only interval of MONTH and SECOND(3).
ARRAY
MULTISET
MAP
ROW
RAW
structured typesOnly exposed in user-defined functions yet.
Tuple
POJO

7. 时间属性和窗口

时间属性

事件时间

DDL方式定义

  • ⚠️:时间时间的数据类型: TIMESTAMP 或者 TIMESTAMP_LTZ
    • TIMESTAMP :年-月-日-时-分-秒(2020-04-15 20:13:40.564)
    • TIMESTAMP_LTZ :带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE)
  • ⚠️:水位线的数据类型: TIMESTAMP 或者 TIMESTAMP_LTZ

TIMESTAMP 格式的时间:直接使用

# WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
# WATERMARK FOR 「事件时间字段名」 AS 「事件时间字段名」 - INTERVAL '「时长」' 「时间单位」
# 设置「5秒」水位线延迟
CREATE TABLE EventTable(
    user STRING,
    url STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    ...
);

长整型的时间格式的时间:需要先转成 TIMESTAMP_LTZ 格式数据

# WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
# WATERMARK FOR 「时间字段名」 AS 「时间字段名」 - INTERVAL '「时长」' 「时间单位」
# 设置「5秒」水位线延迟

# ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3):将 长整型 格式的转成 TIMESTAMP_LTZ 格式
CREATE TABLE events (
    user STRING,
    url STRING,
    ts BIGINT,
    ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
    WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
    ...
);

数据流转换成表中的定义

// 方法一:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// 方法二:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

处理时间

DDL方式定义

# ts AS PROCTIME() 中的 PROCTIME 方法为获取当前处理时间
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);

数据流转换成表中的定义

DataStream<Tuple2<String, String>> stream = ...;
// proctime 方法来获取当前处理时间
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());

窗口(Window)

分组窗口(Group Window,1.13之前)

Tumble (滚动窗口)

// Tumbling Event-time Window
table.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));

// Tumbling Processing-time Window (假设 processing-time 字段为 "proctime")
table.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));

// Tumbling Row-count(行间隔窗口) Window (假设 processing-time 字段为 "proctime")
table.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));

Slide (滑动窗口)

// Sliding Event-time Window
table.window(Slide.over(lit(10).minutes())
            .every(lit(5).minutes())
            .on($("rowtime"))
            .as("w"));

// Sliding Processing-time window (假设 processing-time 字段为 "proctime")
table.window(Slide.over(lit(10).minutes())
            .every(lit(5).minutes())
            .on($("proctime"))
            .as("w"));

// Sliding Row-count window(行间隔窗口) (假设 processing-time 字段为 "proctime")
table.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));

Session (回话窗口)

// Session Event-time Window
table.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));

// Session Processing-time Window (假设 processing-time 字段为 "proctime")
table.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));

窗口表值函数(Windowing TVFs,1.13及以后)

使用窗口函数示例

  • 使用窗口函数会多几三个参数:
    • window_start
    • window_end
    • window_time
-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+

Flink SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

TUMBLE(滚动窗口)

-- 窗口时间为10分钟的滚动窗口, Bid:表名,bidtime:时间字段,INTERVAL '10' MINUTES:窗口时间10分钟
SELECT * FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);

+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

HOP(滑动窗口)

-- 窗口时间为10分钟,步长5分钟
SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);

+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |           window_time   |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

CUMULATE(累加窗口)

-- 窗口时间为10分钟,步长2分钟
SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
);

+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

8、聚合查询

分组聚合

  • 分组聚合就是我们通常SQL写的 group by 语句,常用的聚合函数 SUM()、MAX()、MIN()、AVG()以及 COUNT(),属于多对一的新式

    SELECT SUM(amount)
    FROM Orders
    GROUP BY users
    HAVING SUM(amount) > 50
    

窗口聚合

  • 通过开窗的字段值(window_start、window_end、window_time)进行聚合

    -- 按照窗口的开始时间和结束时间进行 窗口时长为10分钟的窗口 进行聚合
    SELECT window_start, window_end, SUM(price)
      FROM TABLE(
          TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end;
    

Over聚合

  • 在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”

  • 开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系

  • 用法:

    SELECT
    <聚合函数> OVER (
        [PARTITION BY <字段 1>[, <字段 2>, ...]]
        ORDER BY <时间属性字段> 
        <开窗范围>),
    FROM ...
    
    • PARTITION BY:

      • (可选)用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的
    • ORDER BY:

      • 表中的数据本身是无序的,所以在 OVER 子句中必须用 ORDER BY 明确地指出数据基于哪个字段排序。在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性
    • 开窗范围:对于开窗函数而言,必须指定开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围,分为:范围间隔(RANGE intervals)和行间隔(ROW intervals)

      • 范围间隔(RANGE intervals)

        • 范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:

          RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
          
          -- 示例:
          SELECT user, ts,
          COUNT(url) OVER (
              PARTITION BY user
              ORDER BY ts
              RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
          ) AS cnt
          FROM EventTable
          
      • 行间隔(ROW intervals)

        • 行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):

          ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
          
          -- 示例:
          SELECT user, ts,
          COUNT(url) OVER (
              PARTITION BY user
              ORDER BY ts
              ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
          ) AS cnt
          FROM EventTable
          

9. 函数

把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)

内置函数

flink官网内置函数列举

自定义函数

整体使用

当内置函数中的功能满足不了我们的实际业务需求时,我们就需要自己来编写UDF函数来实现业务场景,下面是整体的使用流程:

  1. 编写 自定义函数(见下面的 函数使用说明
  2. 注册函数
  3. 函数调用
    1. 使用 Table API 调用函数,需要使用 call()方法来调用自定义函数
    2. 在 SQL 中调用函数
// 1. 编写自定义函数。。。略
// 2. 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
// 3.1. 使用 Table API 调用函数,需要使用 call()方法来调用自定义函数,第一个参数:函数名,后面的参数:函数所需的参数值
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
// 3.2. 在 SQL 中调用函数
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

函数使用说明

  • 当前 UDF主要有以下几类:
    • 标量函数(Scalar Functions):把 0 到多个标量值映射成 1 个标量值
    • 表函数(Table Functions):把 0 到多个标量值映射成一个表(一个表:n行n列)
    • 聚合函数(Aggregate Functions):把 一个表转换成一个新的标量值(一个表:n行n列)
    • 表聚合函数(Table Aggregate Functions):把 一个表 转换成 另一个表(一个表:n行n列)

标量函数(Scalar Functions)

把 0 到多个标量值映射成 1 个标量值

  1. 继承 ScalarFunction
  2. 编写 eval 方法,方法必须是 public 的
// 1. 编写标量函数
// 		1、继承 ScalarFunction
// 		2、求值方法必须是 public 的,而且名字必须是 eval
public static class HashFunction extends ScalarFunction {
    // 接受任意类型输入,返回 INT 型输出
    // DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示 eval 的参数可以是任意类型
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
        return o.hashCode();
    }
}

// 2. 注册函数
env.createTemporarySystemFunction("HashFunction", HashFunction.class);

// 3.1 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("HashFunction", $("myField")));

// 3.2 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

表函数(Table Functions)

把 0 到多个标量值映射成一个表(一个表:n行n列)

编写

  1. 继承 TableFunction
  2. 编写 eval 方法,方法必须是 public 的

使用

  • 在 Table API 中,表值函数是通过 .joinLateral(...) 或者 .leftOuterJoinLateral(...) 来使用的。
    • joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。
    • leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。
  • 在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE(<TableFunction>) 的使用
// 1. 编写表函数
// 		1、继承 TableFunction
// 		2、求值方法必须是 public 的,而且名字必须是 eval
// 		注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
    public void eval(String str) {
        for (String s : str.split(" ")) {
            // 使用 collect()方法发送一行数据
            collect(Row.of(s, s.length()));
        }
    }
}

// 2. 注册函数
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// 3.1 在 Table API 里调用注册好的函数
env
    .from("MyTable")
    .joinLateral(call("SplitFunction", $("myField")))
    .select($("myField"), $("word"), $("length"));
env
    .from("MyTable")
    .leftOuterJoinLateral(call("SplitFunction", $("myField")))
    .select($("myField"), $("word"), $("length"));

// 3.2 在 SQL 里调用注册好的函数
// 交叉连接
env.sqlQuery(
    "SELECT myField, word, length " +
    "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
// 左连接
env.sqlQuery(
    "SELECT myField, word, length " +
    "FROM MyTable " +
    "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
// 在 SQL 里重命名函数字段
env.sqlQuery(
    "SELECT myField, newWord, newLength " +
    "FROM MyTable " +
    "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

聚合函数(Aggregate Functions)

把 一个表转换成一个新的标量值(一个表:n行n列)

  1. 继承 AggregateFunction
  2. 必须实现的方法
    • createAccumulator():创建一个空的 accumulator
    • accumulate():对于每一行数据,会调用 accumulate() 方法来更新 accumulator
    • getValue():当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果
  3. 在某些场景下是必须实现
    • retract() :在 bounded OVER 窗口中是必须实现的,回退 accumulator
    • merge() :在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。
    • resetAccumulator() :在许多批式聚合中是必须实现的,重置 accumulator
/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

// 1. 编写聚合函数
// 		继承 AggregateFunction 并 重写方法
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
    // 创建一个空的 accumulator
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }
    
    // 返回最终的结果
    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }

    //更新 accumulator
    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }

    // 回退 accumulator
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }

    // merge合并操作
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }

    // 重置 accumulator
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

// 2. 注册函数
tEnv.registerFunction("wAvg", new WeightedAvg());

// 3. 使用函数
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

表聚合函数(Table Aggregate Functions)

把 一个表 转换成 另一个表(一个表:n行n列)

  1. 继承 TableAggregateFunction
  2. 必须要实现的方法
    • createAccumulator():创建一个空的 accumulator
    • accumulate():对于每一行数据,会调用 accumulate() 方法来更新 accumulator
    • emitValue():计算和返回最终的结果
  3. 在某些场景下是必须实现
    • retract() 在 bounded OVER 窗口中的聚合函数必须要实现,回退 accumulator
    • merge() 在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的
    • resetAccumulator() 在许多批式聚合中是必须要实现的,重置 accumulator
/**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
}

// 1. 编写表聚合函数
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    // 创建一个空的 accumulator
    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }

    // 更新 accumulator
    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    // 合并操作
    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    // 计算和返回最终的结果
    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
        // emit the value and rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}

// 2. 注册函数
tEnv.registerFunction("top2", new Top2());

// 3. 使用函数
tab.groupBy("key")
    .flatAggregate("top2(a) as (v, rank)")
    .select("key, v, rank");

Maven依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
 类似资料: