当前位置: 首页 > 知识库问答 >
问题:

将Flink表API与连接表和DataStream.join()进行比较

舒嘉德
2023-03-14

我尝试通过IDs连接两个数据流,发现有两个API集可以这样做,

    < Li > https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining . html < Li > https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/table/SQL/queries . html # joins

看来他们俩都能完成这项工作。

所以我的问题是:

    < li >主要区别是什么?如何挑选? < li >如果我加入流A和B,且两者都有大量记录(例如,A:10000,B:20000 ),那么不同流中所有记录是否会逐一进行比较?比较总数是10000x20000? < li >此外,是否有任何情况(可能是网络问题),流B被延迟,然后流B中的某些记录不能与流A进行比较?

谢了。

共有1个答案

邵昆琦
2023-03-14

主要区别是什么?如何选择?

有几种不同的API可以用来实现与Flink的连接。您可以在https://training.ververica.com/decks/joins/?mode=presenter的Ververica共享的Apache Flink开发人员培训材料中找到对不同方法的调查(在注册表后面)。声明:我写了这些培训材料。

总而言之:

实现流连接的低级构建块是KeyedCoProcessFunction。在拥有完全控制很有价值的特殊情况下,直接使用它是有意义的,但对于大多数目的,您最好使用更高级别的API。

DataSet API提供了作为哈希连接、排序合并连接和广播连接实现的批量连接。该API已被软否决,最终将被有界流和Flink的关系API(SQL/Table)的组合所取代。

DataStream API仅提供一些时间窗口和间隔连接。它不支持任何可能需要无界状态保留的连接。

SQL/Table API支持多种批处理和流式连接:

流动

  • 时间窗口和间隔内外接
  • 非窗口内部外部连接

仅流式传输

  • 时间版本的内部联接
  • 外部查找内部联接

SQL优化器能够推理出由于时间约束而不再需要的状态。但是一些流式连接确实有可能需要无界状态来产生完全正确的结果;可以实施状态保留策略来清除不太可能需要的陈旧条目。

请注意,表 API 可与 DataStream API 完全互操作。我会尽可能使用SQL/表联接,因为它们更容易实现,并且得到了很好的优化。

如果我连接流A和B,且两者都有很多记录(例如,A:10000,B:20000),那么不同流中所有记录是否都要逐一进行比较?比较总数是10000x20000?

Flink支持等键连接,对于某个特定的键,您希望连接流A和B中对该键具有相同值的记录。如果来自A的10000条记录和来自B的20000条记录都具有相同的键,那么是的,A和B的无约束连接将产生10000x20000个结果。

但我不相信你是这个意思。Flink将在其托管状态下实现分布式哈希表,这些哈希表将在集群中(按键)分片。例如,当新记录从流A到达时,它将被散列到A的构建端哈希表中,B的相应哈希表将被探测以找到匹配的记录——并且将发出所有合适的结果。

请注意,这是并行完成的。但是来自A和B的特定键的所有事件将由同一个实例处理。

此外,是否有任何情况(可能是网络问题),流B延迟,然后流B中的某些记录没有与流A进行比较?

如果您将事件时间处理与SQL/Table API提供的时间窗或间隔连接结合起来,则不会考虑后期事件(由水印确定),并且结果将是不完整的。使用数据流API,可以对后期事件进行特殊处理,比如将它们发送到侧面输出,或者收回和更新结果。

对于没有时间约束的连接,延迟事件无论何时到达都会被正常处理。结果(最终)是完整的。

 类似资料:
  • 我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,

  • 我正在流媒体环境中使用Flink的表API和/或Flink的SQL支持(Flink 1.3.1、Scala 2.11)。我从一个数据流【Person】开始,Person是一个case类,看起来像: 在我开始将属性带入图片之前,一切都按预期进行。 例如: 。。。导致: 线程“main”组织中出现异常。阿帕奇。Flink。桌子api。TableException:不支持类型:组织中的任何。阿帕奇。Fl

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 如何比较两个列表是否相等验证数据来自Excel工作表。我需要验证两个列表是否相同,并且列表中没有附加元素或缺少元素。我不需要对列表进行排序。打印输出CAGID Excel data=CAGID Web列表

  • 问题内容: 我有一个查询表,说具有字段CityId,CityName的城市 我有一个订单表,其中包含以下字段:CityId,CustId,CompletedOrders,PendingOrders 我想要一个表/报告,列出所有城市中给定客户的订单详细信息,即我需要的结果是: 怎么做 ? 问题答案: 这将返回所需的所有行,但是对于其中不存在的行将返回值,因此您将获得: 取而代之的解决方案取决于您的数

  • 我正在阅读 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join, 它使用MySQL作为临时表联接中的查找表 我想知道flink如何与MySQL交互,以及mysql方面是否存在临时加入mysql的性能问题。 基本问题是flink如何使用my