当前位置: 首页 > 工具软件 > Sliding Table > 使用案例 >

Flink Table API 使用详解

楚昊明
2023-12-01

Table API是流处理和批处理通用的关系型API,Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:自动完成和语法检测。

Table API与Flink的SQL集成共享许多其API的概念和部分,请参考通用的概念和API来了解如何注册table或者创建一个Table对象,Streaming Concepts页讨论了特殊的概念如:动态表和时间属性。

接下来的例子中假设注册了一个名叫Orders的表并有(a, b, c, rowtime)属性,rowtime字段可以是流中的逻辑时间字段或者是批中的常规时间戳字段。

概述和实例

Table API可以用于Scala和Java中,Scala Table API利用了Scala表达式,Java Table API则是基于字符串来的,字符串会被解析并转换成等价的表达式。

接下来的例子展示了Scala 和 Java Table API的不同之处,表程序是在批环境中执行的,它扫描Orders表,根据a字段来分组,并计算每个分组的结果,表程序的结果转换为一个Row类型的DataSet并打印出来。

Java Table API可以通过导入org.apache.flink.table.api.java.*来启用,下面的例子展示了Java Table API程序如何构建及表达式如何指定为字符串。

// 配置环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

// 在表环境中注册Orders表
// ...

// 指定一个表程序
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)

Table counts = orders
        .groupBy("a")
        .select("a, b.count as cnt");

// 结果转换为 DataSet
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();

Scala Table API可以通过导入org.apache.flink.api.scala._org.apache.flink.table.api.scala._包来启用。

下面例子展示了Scala Table API如何构建, Table属性使用Scala表达式来引用,Scala表达式以`开头:

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

// 配置环境
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

//在表环境中注册一个Orders表
// ...

// specify table program
val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result = orders
               .groupBy('a)
               .select('a, 'b.count as 'cnt)
               .toDataSet[Row] // 转换为 DataSet
               .print()

下面的例子展示了一个更加复杂的Table API程序,程序再次扫描Orders表,过滤空值,将a字符字段转为小写,并每小时计算一次产生一个平均费用b:

// 配置环境
// ...

// 指定表程序
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)

Table result = orders
        .filter("a.isNotNull && b.isNotNull && c.isNotNull")
        .select("a.lowerCase(), b, rowtime")
        .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
        .groupBy("hourlyWindow, a")
        .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
// 配置环境
// ...

// 指定表程序
val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result: Table = orders
        .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
        .select('a.lowerCase(), 'b, 'rowtime)
        .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
        .groupBy('hourlyWindow, 'a)
        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount);

因为Table API是流数据和批数据统一的API,所有的示例程序都可以在批输入或者流输入中执行而不需要修改程序。在两种情况下都会产生相同的结果,使得流记录不会延迟。

操作

Table API支持下面的操作,请注意并不是所有的操作都同时支持批程序和流程序,不支持的会被响应的标记出来。

Scan,Projection和Filter (扫描、映射和过滤)

操作描述
Scan与SQL中的FROM相似,对已注册的表执行扫描操作
Table orders = tableEnv.scan("Orders");
Select与SQL的SELECT语义类似,执行一个select操作
Table orders = tableEnv.scan("Orders");
Table result = orders.select("a, c as d");
你也可以使用(*)作为通配符,来查询表中所有字段的值
Table result = orders.select("*");
As重命名字段
Table orders = tableEnv.scan("Orders");
Table result = orders.as("x, y, z, t");
Where / Filter与SQL中的WHERE类似,过滤不通过过滤谓词的行
Table orders = tableEnv.scan("Orders");
Table result = orders.where("b === 'red'");
或者
Table orders = tableEnv.scan("Orders");
Table result = orders.filter("a % 2 === 0");
操作描述
Scan与SQL查询的From类似,对注册的表执行扫描操作
val orders: Table = tableEnv.scan("Orders")
Select与SQL脚本的SELECT类似,执行一个select操作:
val orders: Table = tableEnv.scan("Orders")
val result = orders.select('a, 'c as 'd)
你可以使用(*)作为通配符,查询表中的所有字段
val orders: Table = tableEnv.scan("Orders")
val result = orders.select('*)
As重命名字段:
val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)
Where/Filter与SQL的WHERE类似,过滤掉不满足过滤谓词的行
val orders: Table = tableEnv.scan("Orders")
val result = orders.filter('a % 2 === 0)或者val orders: Table = tableEnv.scan("Orders")
val result = orders.where('b === "red")

聚合

操作描述
GroupBy 聚合与SQL的GROUP BY类似,使用以下运行的聚合运算符对分组键上的行进行分组,从而以组方式聚合行。
Table orders = tableEnv.scan("Orders");
Table result = orders.groupBy("a").select("a, b.sum as d");
GroupBy Window 聚合对分组窗口上的表,进行分组和聚合,可能会按一个或者多个key进行分组
Table orders = tableEnv.scan("Orders");
Table orders = tableEnv.scan("Orders");
Table result = orders .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
  .groupBy("a, w") // group by key and window
  .select("a, w.start, w.end, b.sum as d"); // access window properties and aggregate
Over Window 聚合与SQL的OVER类似,根据前一行和后续行的窗口(范围)为每行计算窗口聚合,参看over window获取更多信息。
Table orders = tableEnv.scan("Orders");
Table result = orders
.window(Over
.partitionBy("a")
.orderBy("rowtime")
.preceding("UNBOUNDED_RANGE")
.following("CURRENT_RANGE")
.as("w")
.select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate
Distinct与SQL的DISTINCT类似,返回不重合的记录。
Table orders = tableEnv.scan("Orders");
Table result = orders.distinct();
操作描述
GroupBy 聚合与SQL的GROUP BY类似,使用以下运行的聚合运算符对分组键上的行进行分组,从而以组方式聚合行。
val orders: Table = tableEnv.scan("Orders")
val result = orders.groupBy('a).select('a, 'b.sum as 'd)
GroupBy Window 聚合对分组窗口上的表,进行分组和聚合,可能会按一个或者多个key进行分组
val orders: Table = tableEnv.scan("Orders")
val result: Table = orders
  .window(Tumble over 5.minutes on 'rowtime as 'w)
  .groupBy('a, 'w) // group by key and window
  .select('a, w.start, 'w.end, 'b.sum as 'd) // access window properties and aggregate
Over Window 聚合与SQL的OVER类似,根据前一行和后续行的窗口(范围)为每行计算窗口聚合,参看over window获取更多信息。
val orders:Table = tableEnv.scan("Orders");
val result:Table = orders
  .window(Over
  .partitionBy("a")
  .orderBy("rowtime")
  .preceding("UNBOUNDED_RANGE")
  .following("CURRENT_RANGE")
  .as("w")
  .select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate
Distinct与SQL的DISTINCT类似,返回不重合的记录。
val orders:Table = tableEnv.scan("Orders");
val result:Table = orders.distinct();

Joins

操作描述
Inner Join与SQL的JOIN 类似,连接两张表,两张表必须具有不同的字段名称,并且必须至少具有一个相等的连接谓词,必须通过连接运算符或使用where或filter运算符来定义。
  Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  Table result = left.join(right).where("a = d").select("a, b, e");
Left Outer Join与SQL的LEFT OUTER JOIN类似,
Right Outer Join与SQL的RIGHT OUTER JOIN类似,
Full Outer Join与SQL的FULL OUTER JOIN 类似,
TableFunction Join与SQL的TableFunction Join类似
TableFunction Left Outer Join与SQL的TableFunction Left Outer Join类似

 

 类似资料: