我尝试通过IDs连接两个数据流,发现有两个API集可以这样做,
看来他们俩都能完成这项工作。
所以我的问题是:
谢了。
主要区别是什么?如何选择?
有几种不同的API可以用来实现与Flink的连接。您可以在https://training.ververica.com/decks/joins/?mode=presenter的Ververica共享的Apache Flink开发人员培训材料中找到对不同方法的调查(在注册表后面)。声明:我写了这些培训材料。
总而言之:
实现流连接的低级构建块是KeyedCoProcessFunction
。在拥有完全控制很有价值的特殊情况下,直接使用它是有意义的,但对于大多数目的,您最好使用更高级别的API。
DataSet API提供了作为哈希连接、排序合并连接和广播连接实现的批量连接。该API已被软否决,最终将被有界流和Flink的关系API(SQL/Table)的组合所取代。
DataStream API仅提供一些时间窗口和间隔连接。它不支持任何可能需要无界状态保留的连接。
SQL/Table API支持多种批处理和流式连接:
流动
仅流式传输
SQL优化器能够推理出由于时间约束而不再需要的状态。但是一些流式连接确实有可能需要无界状态来产生完全正确的结果;可以实施状态保留策略来清除不太可能需要的陈旧条目。
请注意,表 API 可与 DataStream API 完全互操作。我会尽可能使用SQL/表联接,因为它们更容易实现,并且得到了很好的优化。
如果我连接流A和B,且两者都有很多记录(例如,A:10000,B:20000),那么不同流中所有记录是否都要逐一进行比较?比较总数是10000x20000?
Flink支持等键连接,对于某个特定的键,您希望连接流A和B中对该键具有相同值的记录。如果来自A的10000条记录和来自B的20000条记录都具有相同的键,那么是的,A和B的无约束连接将产生10000x20000个结果。
但我不相信你是这个意思。Flink将在其托管状态下实现分布式哈希表,这些哈希表将在集群中(按键)分片。例如,当新记录从流A到达时,它将被散列到A的构建端哈希表中,B的相应哈希表将被探测以找到匹配的记录——并且将发出所有合适的结果。
请注意,这是并行完成的。但是来自A和B的特定键的所有事件将由同一个实例处理。
此外,是否有任何情况(可能是网络问题),流B延迟,然后流B中的某些记录没有与流A进行比较?
如果您将事件时间处理与SQL/Table API提供的时间窗或间隔连接结合起来,则不会考虑后期事件(由水印确定),并且结果将是不完整的。使用数据流API,可以对后期事件进行特殊处理,比如将它们发送到侧面输出,或者收回和更新结果。
对于没有时间约束的连接,延迟事件无论何时到达都会被正常处理。结果(最终)是完整的。
我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,
我正在流媒体环境中使用Flink的表API和/或Flink的SQL支持(Flink 1.3.1、Scala 2.11)。我从一个数据流【Person】开始,Person是一个case类,看起来像: 在我开始将属性带入图片之前,一切都按预期进行。 例如: 。。。导致: 线程“main”组织中出现异常。阿帕奇。Flink。桌子api。TableException:不支持类型:组织中的任何。阿帕奇。Fl
在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。
如何比较两个列表是否相等验证数据来自Excel工作表。我需要验证两个列表是否相同,并且列表中没有附加元素或缺少元素。我不需要对列表进行排序。打印输出CAGID Excel data=CAGID Web列表
问题内容: 我有一个查询表,说具有字段CityId,CityName的城市 我有一个订单表,其中包含以下字段:CityId,CustId,CompletedOrders,PendingOrders 我想要一个表/报告,列出所有城市中给定客户的订单详细信息,即我需要的结果是: 怎么做 ? 问题答案: 这将返回所需的所有行,但是对于其中不存在的行将返回值,因此您将获得: 取而代之的解决方案取决于您的数
我正在阅读 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join, 它使用MySQL作为临时表联接中的查找表 我想知道flink如何与MySQL交互,以及mysql方面是否存在临时加入mysql的性能问题。 基本问题是flink如何使用my