当前位置: 首页 > 面试题库 >

在Storm中延迟执行队列– Kafka,Cassandra,Redis或Beanstalk?

墨阳羽
2023-03-14
问题内容

我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存。我会尽快处理这些消息。由于来自外部源(例如HTTP)的响应,很少有消息没有得到完全处理。如果HTTP服务器在一段时间后不响应/返回错误消息以重试,我想为重试实现指数补偿机制。我想不出什么主意就能实现它们。我想知道如果还有其他可以容错的解决方案,那么其中哪一个将是更好的解决方案。由于这是用于实现指数补偿的,因此每个消息将具有不同的延迟时间。

  • 将其发送给 Kafka 中的另一个主题,稍后再使用。 我的首选解决方案 。我知道我们可以使用Kafka偏移量,以便在后期使用消息。我怎么找不到文档/示例代码来做同样的事情。如果有人可以帮助我,这将非常有帮助。
  • 编写消息 Cassandra / Redis, 并编写调度程序以获取未处理的消息并准备使用,然后将其发送给Kafka,以便我的风暴拓扑可以使用它。(其他遗留项目中的现有解决方案(Non Storm))
  • 延迟发送给 Beanstalk (其他遗留项目中的现有解决方案(Non Storm)。我想避免使用该解决方案,而仅在无法使用时使用它)。

虽然这几乎是我想要做的。我无法找到实现Kafka中提到的delayProcessingUntil的文档-使用高级使用者实现延迟队列

过去,我已经完成了数据存储中的预定工作,并使用Beanstalk进行了延迟,但是我更喜欢使用Kafka。


问题答案:

Kafka出口具有内置的指数补偿消息重试功能。您可以通过喷嘴配置来配置初始延迟,延迟乘数和最大延迟。如果螺栓有错误,则可以调用collector.fail(input)。之后,您只需将其喷出即可重试。

https://github.com/apache/storm/blob/v0.10.0/external/storm-
kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java



 类似资料:
  • 本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下 Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不

  • 最初,我在运行拓扑时只分配了1个executor给QueryNormalizer。执行潜伏期为8.952,处理潜伏期为12.857。 为了更快,我将QueryNormalizer中的执行程序数更改为4。执行延迟更改为197.616,处理延迟更改为59.132。 根据执行延迟的定义-元组在执行方法中花费的平均时间。execute方法可以在不发送元组的Ack的情况下完成。 此外,处理延迟是否应始终低于

  • 问题内容: 有什么技巧可以基于Redis延迟任务执行(即计划)? 也许一些聪明的方法可以将BLPOP延迟给定的秒数? 问题答案: 您可以使用名称中带有时间成分的多个LIST环。作为时间成分,您可以采用当前秒(0-59)。 您总是将任务添加到当前秒的列表中。要获得作业,请仅在保证内容早于给定秒数的那些列表上执行BLPOP(低超时)。 如果您在多个主机上工作,则必须注意时钟是同步的(NTP)。

  • 我正在使用从2个Kafka主题中消费,每个主题都有6个分区。喷口进入单个螺栓以解压缩相关字节,然后进入第二个螺栓以进一步处理。 当我看到storm-ui的时候,这些数字没有多大意义,我希望有人能给我一些启示。 > Kafka的口水说它“收集”了3600个元组,失败了73M个元组。再看下一组的螺栓,我看到有的已经顶起了73米,有的已经顶起了1.3米(没有出现故障),而有的已经顶起了1.3米(没有出现

  • 我把来自https://bitnami.com/stack/cassandra的卡珊德拉安装在云机器上。我克隆了这台机器,所以我有2台机器。一个运行cassandra服务器(1节点cassandra集群),另一个充当客户端并向第一个服务器发出查询。 我使用YCSB-https://github.com/brianfrankcooper/YCSB来执行基准测试。我观察到服务器上的读取延迟非常低,只有