当前位置: 首页 > 知识库问答 >
问题:

如何通过datastrem API或Flink Table API/SQL将给定键和公共窗口上的三个或多个数据流/表连接起来?

公羊英达
2023-03-14
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

=================================

对于FlinK表API和SQL同样的问题,如何在给定的键和公共窗口上连接三个或更多的表?官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html只给出了下面单表的示例。

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

我尝试编写如下所示的SQL,以便在给定的键和公共窗口上连接三个表,但我认为这是不对的。

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "   

对于Flink SQL,我所需要的,就像下面的伪代码一样,是用一个通用的TumblingEventTimeWindow连接三个表,也就是说DataStream API的替代版本,不管用Flink SQL表示,也意味着连接发生在同一TumblingEventTimeWindow中的三个表中的所有事件。

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

在下面的Flink设计文档中似乎也提到了连接特性:“事件时间滚动窗口流-流连接:连接处于同一滚动事件时间窗口中的两个流的元组”,我不知道Flink SQL是否实现了这种类型的Flink SQL连接特性。

共有1个答案

章阳波
2023-03-14

很难对您的问题给出一个明确的答案,因为您需要的连接的语义不清楚。数据流API的窗口连接实现的语义不同于表API/SQL的窗口连接。

在DataStream API上,您可以简单地定义另一个联接,如下所示:

firstStream
  .join(secondStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})
  .join(thirdStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

由于Flink实现了标准SQL,您可以像往常一样定义三个表的联接:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE
 类似资料:
  • 错误结果为 “OneToOne关系的JPA问题:外键引用的列数错误。应为2” 如何为join测试表指定主键? 表A:列id与表B:列test_id映射

  • 问题内容: 给定以字典为元素的列表,我想产生一个新列表,其中包含一组连接的字典。每个字典都保证有一个称为“索引”的键,但除此之外可以有任意键集。非索引键永远不会在列表之间重叠。例如,想象以下两个列表: (永远不会出现在中,因为它出现在中,并且类似地,永远不会出现在中,因为它出现在中) 我想产生一个联合列表: 在Python中最有效的方法是什么? 问题答案: 编辑 :由于不能保证被排序(不按特定顺序

  • 问题内容: 2个流: 给定可读流, 并且 获取包含 并 连接 流的惯用(简洁)方法是什么? 我不能做,因为这样流内容混杂在一起。 n个 流: 给定一个EventEmitter发出不确定数量的流,例如 一种将 所有流串联在一起的流 的惯用(简洁)方法是什么? 问题答案: 该合并的流包会连接流。自述文件中的示例: 我相信您必须立即添加所有流。如果队列为空,则自动结束。参见问题5。 该流流库是一个具有明

  • 有三张桌子 库存-包含库存信息 订单-包含订单的存储信息 OrderParts-包含与订单关联的部分 库存表 订单表 订购零件表 需要生成以下预期报告 查询试图实现上述结果 获取已订购数量为空 请告诉我如何得到预期的累计订货量。提前道谢。

  • 问题内容: 我正在尝试从第二个窗口切换到第三个窗口。但是无法处理第三个窗口。有人可以帮助我解决此问题。我已经使用比较窗口标题的逻辑,但是它不起作用。代码======================= 错误堆栈跟踪: 问题答案: 这是切换到 并单击 按钮的完整代码块: 我的IDE控制台上的输出是:

  • 本文向大家介绍emacs 多个窗口或框架,包括了emacs 多个窗口或框架的使用技巧和注意事项,需要的朋友参考一下 示例 Emacs中的“窗口”指的是否则称为“窗格”或“屏幕划分”的内容。一些窗口操作命令包括: 水平分割当前窗口: C-x 2 垂直拆分当前窗口: C-x 3 选择下一个窗口: C-x o 关闭当前窗口: C-x 0 关闭所有其他窗口,但当前窗口除外: C-x 1 Emacs中的“框