当前位置: 首页 > 知识库问答 >
问题:

在Flink中按顺序读取两个流(main和configs)

邹正阳
2023-03-14

我有两个流,一个是主流,比如在欺诈检测的示例中,我有事务流,然后我有第二个流,即配置,在我们的示例中是规则。所以我将主流连接到配置流以进行处理。但当flink第一次启动时,我们正在添加作业,它开始消耗事务并配置流并行,当需要处理事务时,它有时会看到没有配置,我们必须将事务发送到死信队列。然而,我想实现的是,如果有专利配置,我可以稍后再获取,我想先获取该配置,然后获取事务,以便处理它,而不是将其发送到死信队列。对于事务和配置,我有相同的密钥。

长话短说,有没有办法告诉Flink,当第一次工作开始尝试使用一条流,直到没有新的价值,然后开始处理主流?我怎样才能让它们有顺序?

共有2个答案

彭令秋
2023-03-14

从版本1.16开始,您可以使用Flink中的混合源代码支持在读取第二个源代码之前读取所有一次源代码(在您的情况下是配置)。虽然我想您必须将事件映射到

刁冠宇
2023-03-14

建议的方法是连接这两个流并应用RichCoFlatMap,它将允许您在等待接收配置事件时缓冲来自main的事件。

查看Flink教程的这个有用部分。最后一段实际上描述了您的问题。

必须认识到,您无法控制调用flatMap1和flatMap2回调的顺序。这两个输入流相互竞争,Flink运行时将根据需要处理来自一个流或另一个流的事件。在计时和/或排序很重要的情况下,您可能会发现有必要将事件缓冲在托管Flink状态,直到您的应用程序准备好处理它们为止。(注意:如果您真的很绝望,可以通过使用实现InputSelectable接口的自定义操作符,对双输入操作符使用其输入的顺序施加一些有限的控制。

因此,简而言之,您应该连接两个流,并具有某种ListState,您可以在等待接收规则时“缓冲”主要元素。当您从配置流接收到一个元素时,请检查ListState(缓冲区)中是否有一些挂起的元素“等待”该配置。如果这样做,则可以处理这些元素并通过平面图的收集器发射它们。

 类似资料:
  • 下面的相同代码显示了两个源函数-一个产生0-20的偶数,另一个产生1-20的奇数,连接在一起以输出所有两个流的并集并将它们打印出来。 示例代码: 输出 Q1. Flink应该将连接流中最先到达的项目发送到协处理函数。然而,我们在这里看到的是,数字“2”是以源函数的方式在数字“11”之前生成的,但数字“11”是在“2”之前发送给协处理函数的。为什么会这样? 第二季度。 连接流中无背压发生。源函数一直

  • 第一次使用python。我正在尝试浏览包含段落和表格的word文档。我已经弄清楚了如何使用以下代码浏览文档中的所有段落和文档中的所有表格: 但我正试图找到一种方法,像任何阅读它的人一样,有序地浏览这份文件。所以如果我们有一份文件包含: 它会按照这个顺序读。我想这样做的原因是,根据表格后面的段落,我想对它执行不同的操作。

  • 问题内容: 我在Oracle中有两个表,作业和参考。 我想在两个表中插入一条新记录,并使用从序列中生成的键。就像是: 当然,这导致: 有没有不使用PL / SQL的方法?我非常喜欢仅使用SQL来做到这一点。 问题答案: 您可以为此使用多表插入语法的副作用: SQL小提琴。 从限制: 您不能在多表插入语句的任何部分中指定序列。多表插入被视为单个SQL语句。因此,对NEXTVAL的第一个引用将生成下一

  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • 我开始使用flink,看看官方教程之一。 据我所知,这个练习的目标是在时间属性上加入两个流。 任务: 此练习的结果是一个Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略结束事件,只在每次骑乘开始时加入事件,并提供相应的票价数据。 生成的流应打印到标准输出。 问:EnrichmentFunction如何连接这两个流aka。它如何知道参加哪个集市和哪个骑行?我希望它能够缓冲多个

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?