当前位置: 首页 > 知识库问答 >
问题:

对intervalJoin感到困惑

燕俊明
2023-03-14

我正在尝试提出一种解决方案,它涉及在连接操作之后应用一些逻辑,从多个事件b中的streamB中选择一个事件。这类似于reduce函数,但它只返回1个元素,而不是递增地返回。因此最终结果将是单个(EventAEventB对,而不是一个EventA>/code>和多个EventB的叉积。

streamA
      .keyBy((a: EventA) => a.common_key)
      .intervalJoin(
          streamB
            .keyBy((b: EventB) => b.common_key)
        )
      .between(Time.days(-30), Time.days(0))
      .process(new MyJoinFunction)

数据将被摄取(假设它们具有相同的密钥):

EventB ts:1616686386000

< code > EventB ts:1616686387000

事件 B ts: 1616686388000

EventB ts:1616686389000

事件 A ts: 1616686390000

每个EventA键保证只到达一次。

假设像上面这样的连接操作,它用4个事件b生成了1个EventA,成功地连接并收集在MyJoinFunction。现在,我想做的是,立即访问这些值,并执行一些逻辑以将事件a正确匹配到一个EventB。例如,对于上面的数据集,我需要(EventA1616686390000EventB1616686387000)。

将为每个(< code>EventA,< code>EventB)对调用< code>MyJoinFunction,但我希望在此之后有一个操作,它允许我访问一个迭代器,以便我可以查看每个< code>EventA的所有< code>EventB事件。

我知道我可以在连接后应用另一个窗口操作来对所有对进行分组,但我希望在连接成功后立即进行。因此,如果可能,我希望避免添加另一个窗口,因为我的窗口已经很大(30天)。

Flink是这个用例的正确选择,还是我完全错了?

共有1个答案

郑高驰
2023-03-14

这可以实现为KeyedCoProcessFunction。您将通过它们的公共密钥对两个流进行键值,连接它们,并一起处理两个流。

您可以使用ListState存储来自B的事件(对于给定键),以及ValueState存储来自a的事件(同样,对于给定键)。您可以使用事件时间计时器来了解何时可以查看ListState中的B事件,并生成结果。完成后不要忘记清除状态。

如果您不熟悉Flink API的这一部分,有关流程函数的教程应该会有所帮助。

 类似资料:
  • 所以我一直在读Kafka的语义学,我对它的工作原理有点困惑。 我理解生产者如何避免发送重复的消息(以防代理的ack失败),但我不明白的是,在消费者处理消息但在提交偏移量之前崩溃的情况下,一次是如何工作的。Kafka不会在这种情况下重试吗?

  • 问题内容: 我已经在eclipse中创建了一个项目,并添加了Maven依赖项。在Eclipse中,它表示我正在使用JRE 1.5。一切在Eclipse中都可以正常运行,例如,我可以运行测试。 当我尝试从终端运行时,出现以下错误。 …在-source 1.3中不支持泛型(使用-source 5或更高版本来启用泛型)… 看来,Maven认为我正在使用JRE 1.3,并且无法识别泛型或for-each循

  • 问题内容: 在碰到此链接http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using- executors-and-threadpoolexecutor 之后,这是我第一次为新项目使用Java线程池。 .html ,我对此更加困惑,这是页面中的代码, 在代码中,创建了一个固定大小的池并创建了10个工作线程,对吗? 线程池应该

  • 问题内容: 与此代码有点混淆。 我在pg-go 仓库中找到了这段代码,不知道为什么这样声明。请解释一下用这种方式声明变量的用例是什么。 问题答案: 这在运行时不会执行任何操作,但是除非类型满足接口要求,否则编译将失败。这是一种静态断言。

  • 问题内容: 我可以理解以下定义: 每个对象都有一个标识,一个类型和一个值。一旦创建了对象,其身份就永远不会改变。您可能会认为它是对象在内存中的地址。所述操作者比较两个对象的身份; 该函数返回一个表示其身份的整数。 我认为上面的定义在创建“某物”时起作用,例如: 但是我不理解: 我还没有创建任何东西。那么整数“ 1”如何具有ID?这是否意味着只要我在Python Shell中“提及” 1,便立即将其

  • 问题内容: 映射为: 运行以上查询将返回文档。我想了解松紧带在幕后做什么?通过查看默认分析器的输出,它不会标记癌症,使其返回“可以”,那么为什么返回带有“可以”一词的文档,又是什么原因导致该文档被返回呢?换句话说,搜索查询“癌症”正在发生什么其他处理。 更新 我是否可以在我的机器上运行一个命令,该命令将清除所有索引和所有内容,因此我的表盘整洁?我执行了删除/ *的操作,但成功了,但仍然匹配了。 问