当前位置: 首页 > 知识库问答 >
问题:

Apache Flink延迟处理某些事件

戴化
2023-03-14

我需要延迟处理一些事件。

我有三件事(发表在Kafka上):

  • A(id: 1, retry At: now)
  • B(id: 2, retry At: 10分钟后)
  • C(id: 3, retry At: now)

我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗?

到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实现它。

我也查阅了Kafka的文档,但在那里似乎不可行。

共有1个答案

满自明
2023-03-14

触发器适用于windows,但窗口似乎不适合您的用例。

更好的解决方案是使用带有键控处理功能的计时器。根据要等待10分钟的处理时间还是10分钟的事件时间,您将选择处理时间计时器或事件时间计时器。

您还需要使用Flink state来存储需要稍后处理的事件。

您可以在这里找到流程函数的文档。Flink训练中还有一些其他的例子,这里和这里。

FWIW,Flink的状态函数API可能更适合您正在做的事情,在这种情况下,您将使用延迟消息。

 类似资料:
  • 如何在Vertx中处理延迟作业列表(实际上是数百个HTTP GET请求,到禁止快速请求主机的有限API)?现在,我正在使用此代码,它被阻止,因为Vertx一次启动所有请求。希望在每个请求之间有5秒的延迟来处理每个请求。

  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 我有一系列应用程序使用来自SQS队列的消息。如果由于某种原因,这些消费者中的一个出现故障,并且停止使用消息,我希望得到通知。做这件事最好的方法是什么? 请注意,其中一些队列每2-3天只能将一条消息放入队列,因此等待队列中的消息数触发通知对我来说不是一个好的选择。 我正在寻找的是可以监视一个SQS队列并说“这个消息已经在这里一个小时了,什么都没有处理它……让某人知道。”

  • top看到单个CPU 100%时,就是垂直扩展的时候了。如果需要让CPU使用率最大化,可以配置Redis实例数对应CPU数, Redis实例数对应端口数(8核Cpu, 8个实例, 8个端口), 以提高并发。单机测试时, 单条数据在200字节, 测试的结果为8~9万tps。(未实测)。 另外,对于命令的复杂度一定要关注。

  • 正在发生的事情: 在第8点。无论处理程序是否取消事件,由于取消检查默认为false,第二个事件已经排队。Guava的EventBus坚持在启动下一个事件之前完成当前的处理程序运行,我确信这有其用处,但这不是我想要的。 尝试黑客: