我使用Flink数据流API中的< code > keyedcorprocessfunction 类来实现一个超时用例。场景如下:我有一个输入kafka主题和一个输出kafka主题,一个服务从输入主题中读取并处理它(持续可变的时间),然后在输出Kafka主题中发布响应。
现在要实现超时(必须使用Flink datastream API),我有一个从kafka输入主题读取的FlinkKafka消费者
和另一个从kafka输出主题读取的FlinkKafka消费者
。我正在连接两个流,并使用Process Element1
我正在注册一个计时器并等待onTimer
方法被触发(声明了超时),或者Process Element2
被触发,因此我删除了计时器并且没有声明超时。
在对大规模事件进行测试时,我看到<code>NULLPTREXCEPTION</code>并且我怀疑<code>processElement2</code>是在<code>ProcessElement1</code>之前触发的,在上述情况下,由于任何原因(知道服务处理元素所花费的时间可能需要几秒),从输出主题读取元素的场景是否会发生在从输入主题读取元素之前?在这种情况下,严格使用Flink DataStream API实现上述超时功能的最佳情况是什么,请提供任何提示?
谢谢你。
是的,不能保证Flink会在Process Element2
之前调用Process Element1
方法。这很可能,但不确定。或者换句话说,可能并不总是这样。
这与您在Flink训练中的LongRideAlerts练习中发现的情况完全相同--https://github.com/apache/flink-training/tree/master/long-ride-alerts--因此,您可以模仿那里使用的解决方案。逻辑是这样的:
每当缺少来自 topic1 的输入事件时,此解决方案都会出现泄漏状态。如果重要,请参阅训练练习附带的讨论,了解如何处理该问题(简短版本:使用状态TTL)。
TL;DR:目前保证Flink中事件时间顺序的最佳解决方案是什么? 我使用Flink 1.8.0和Kafka 2.2.1。我需要通过事件时间戳保证事件的正确顺序。我每隔1秒生成周期性水印。我使用Flink Kafka消费者与AscendingTimestampExtractor: 然后处理: 我意识到,对于在同一毫秒或几毫秒之后发生的无序事件,Flink不会纠正顺序。我在文档中发现: 水印触发所有
我实际上是使用处理来检查从键盘输入的值并采取行动。现在的问题是,我想使用键盘上的数字“1”来根据IF语句执行两个不同的操作,但第二个条件似乎不起作用。请帮助我仔细阅读这段代码,因为我不知道我可能在哪里出错了
我们有一个单节点Cassandra集群(Apache),在AWS上有2个VCPU和大约16 GB的RAM。我们有大约28 GB的数据上传到Cassandra。 现在Cassandra可以很好地使用主键进行选择和分组查询,但是当使用用户定义的函数在非主键上使用聚合函数时,它会给出一个超时。 为了详细说明-我们对3年数据的年份、月份和日期进行了分区。现在,例如,如果两列是-Bill_ID和Bill_A
本文向大家介绍PHP函数超时处理方法,包括了PHP函数超时处理方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了PHP函数超时处理方法。分享给大家供大家参考,具体如下: register_shutdown_function Registers the function named by function to be executed when script processing is c
我研究Flink已经一个多星期了。我们正在从Kafka消费事件,我们希望事件属于一个特定的对象id需要按照事件时间的顺序进行处理。到目前为止,我的研究告诉我,我应该使用keyby和timewinds,我的理解是正确的吗? 另一个问题是,当一个任务管理器关闭时,只有属于该任务管理器的事件才会被停止处理,直到该任务管理器启动?检查点机制是否知道未被处理的事件,它将如何请求Kafka关于这些事件? 下面
问题内容: 是否可以为Alamofire请求添加超时处理程序? 在我的项目中,我以这种方式使用Alamofire: 编辑: 请求失败消息 错误域= NSURLErrorDomain代码= -1001“请求超时。” UserInfo = {NSUnderlyingError = 0x7fc10b937320 {Error Domain = kCFErrorDomainCFNetwork Code =