当前位置: 首页 > 知识库问答 >
问题:

使用键控协同处理函数的 Flink 超时 FlinkKafkaConsumer 的读取顺序

万俟炯
2023-03-14

我使用Flink数据流API中的< code > keyedcorprocessfunction 类来实现一个超时用例。场景如下:我有一个输入kafka主题和一个输出kafka主题,一个服务从输入主题中读取并处理它(持续可变的时间),然后在输出Kafka主题中发布响应。

现在要实现超时(必须使用Flink datastream API),我有一个从kafka输入主题读取的FlinkKafka消费者和另一个从kafka输出主题读取的FlinkKafka消费者。我正在连接两个流,并使用Process Element1我正在注册一个计时器并等待onTimer方法被触发(声明了超时),或者Process Element2被触发,因此我删除了计时器并且没有声明超时。

在对大规模事件进行测试时,我看到<code>NULLPTREXCEPTION</code>并且我怀疑<code>processElement2</code>是在<code>ProcessElement1</code>之前触发的,在上述情况下,由于任何原因(知道服务处理元素所花费的时间可能需要几秒),从输出主题读取元素的场景是否会发生在从输入主题读取元素之前?在这种情况下,严格使用Flink DataStream API实现上述超时功能的最佳情况是什么,请提供任何提示?

谢谢你。

共有1个答案

莘钧
2023-03-14

是的,不能保证Flink会在Process Element2之前调用Process Element1方法。这很可能,但不确定。或者换句话说,可能并不总是这样。

这与您在Flink训练中的LongRideAlerts练习中发现的情况完全相同--https://github.com/apache/flink-training/tree/master/long-ride-alerts--因此,您可以模仿那里使用的解决方案。逻辑是这样的:

  • 无论哪个事件(对于给定密钥)首先到达,从任一流中存储它
  • 如果输入事件(来自主题 1)首先到达,请设置计时器
  • 如果输出事件(来自 topic2)在计时器触发之前到达第二个,请删除计时器
  • 无论哪个事件(对于给定密钥)从任一流中秒到达,请清除状态(由两个流共享)
  • 每当计时器触发时,使用从主题 1 保存的输入事件以键控状态创建报告并清除状态

每当缺少来自 topic1 的输入事件时,此解决方案都会出现泄漏状态。如果重要,请参阅训练练习附带的讨论,了解如何处理该问题(简短版本:使用状态TTL)。

 类似资料:
  • TL;DR:目前保证Flink中事件时间顺序的最佳解决方案是什么? 我使用Flink 1.8.0和Kafka 2.2.1。我需要通过事件时间戳保证事件的正确顺序。我每隔1秒生成周期性水印。我使用Flink Kafka消费者与AscendingTimestampExtractor: 然后处理: 我意识到,对于在同一毫秒或几毫秒之后发生的无序事件,Flink不会纠正顺序。我在文档中发现: 水印触发所有

  • 我实际上是使用处理来检查从键盘输入的值并采取行动。现在的问题是,我想使用键盘上的数字“1”来根据IF语句执行两个不同的操作,但第二个条件似乎不起作用。请帮助我仔细阅读这段代码,因为我不知道我可能在哪里出错了

  • 我们有一个单节点Cassandra集群(Apache),在AWS上有2个VCPU和大约16 GB的RAM。我们有大约28 GB的数据上传到Cassandra。 现在Cassandra可以很好地使用主键进行选择和分组查询,但是当使用用户定义的函数在非主键上使用聚合函数时,它会给出一个超时。 为了详细说明-我们对3年数据的年份、月份和日期进行了分区。现在,例如,如果两列是-Bill_ID和Bill_A

  • 本文向大家介绍PHP函数超时处理方法,包括了PHP函数超时处理方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了PHP函数超时处理方法。分享给大家供大家参考,具体如下: register_shutdown_function Registers the function named by function to be executed when script processing is c

  • 我研究Flink已经一个多星期了。我们正在从Kafka消费事件,我们希望事件属于一个特定的对象id需要按照事件时间的顺序进行处理。到目前为止,我的研究告诉我,我应该使用keyby和timewinds,我的理解是正确的吗? 另一个问题是,当一个任务管理器关闭时,只有属于该任务管理器的事件才会被停止处理,直到该任务管理器启动?检查点机制是否知道未被处理的事件,它将如何请求Kafka关于这些事件? 下面

  • 问题内容: 是否可以为Alamofire请求添加超时处理程序? 在我的项目中,我以这种方式使用Alamofire: 编辑: 请求失败消息 错误域= NSURLErrorDomain代码= -1001“请求超时。” UserInfo = {NSUnderlyingError = 0x7fc10b937320 {Error Domain = kCFErrorDomainCFNetwork Code =