当前位置: 首页 > 知识库问答 >
问题:

Storm Kafka Spout上的最大元组重播数

阚通
2023-03-14

我们用Storm和Kafka喷口。当我们的消息失败时,我们希望重播它们,但在某些情况下,错误的数据或代码错误将导致消息总是失败一个螺栓,所以我们将进入无限的重播循环。很明显,当我们发现错误时,我们正在修复错误,但希望我们的拓扑具有一般的容错性。在一个元组被重播了N次以上之后,我们如何能够ack()?

通过查看Kafka Spout的代码,我看到它被设计为使用指数回退计时器重试,并对PR状态进行注释:

“spout不会终止重试循环(我确信它不应该这样做,因为它不能报告关于中止reqeust时发生的失败的上下文),它只处理延迟重试。拓扑中的bolt最终仍然会调用ack()而不是fail()来停止循环。”

我见过StackOverflow的回复建议编写一个自定义的喷注,但是如果有推荐的方法在一个bolt中这样做的话,我宁愿不去维护Kafka喷注内部的自定义补丁。

什么是正确的方法来做这件事在一个螺栓?我在元组中没有看到任何状态来公开它被重播了多少次。

共有1个答案

董高洁
2023-03-14

Storm本身并不为您的问题提供任何支持。因此,定制的解决方案是唯一的出路。即使您不想修补kafkaspout,我认为,引入一个计数器并打破其中的重播循环,将是最好的方法。作为另一种选择,您也可以继承kafkaspout并在子类中放置一个计数器。这当然有点类似于补丁,但可能不那么具有侵扰性,也更容易实现。

如果要使用Bolt,可以执行以下操作(这还需要对kafkaspout或其子类进行一些更改)。

  • 为每个元组分配一个唯一的ID作为附加属性(可能,已经有一个唯一的ID可用;否则,可以引入一个“counter-ID”或仅引入整个元组(即所有属性)来标识每个元组)。
  • 通过ID上的fieldsgroupingkafkaspout后面插入一个bolt(以确保重播的元组流式传输到同一个bolt实例)。
  • 在bolt中,使用hashmap 缓冲所有元组并计算(重试)次数。如果计数器小于阈值,则转发输入元组,以便由随后的实际拓扑进行处理(当然,您需要适当地锚定该元组)。如果计数大于阈值,则对元组进行ack以中断循环,并从散列映射中删除其条目(您可能还希望记录所有失败的元组)。
  • 为了从hashmap中删除成功处理的元组,每次在kafkaspout中添加元组时,您需要将元组ID转发到bolt,以便它可以从hashmap中删除该元组。只需为kafkaspout子类声明第二个输出流并覆盖spout.ack(...)(当然,您需要调用super.ack(...)以确保kafkaspout也获得该ack)。

但是,这种方法可能会消耗大量内存。作为在hashmap中为每个元组提供一个条目的替代方法,您还可以使用第三个流(它与其他两个流一样连接到bolt),如果元组失败(即在spout.fail(...))时转发元组ID。每次bolt从这第三个流接收到“Fail”消息时,计数器就会增加。只要hashmap中没有条目(或者没有达到阈值),bolt就简单地转发元组进行处理。这应该会减少使用的内存,但需要在您的spout和bolt中实现一些更多的逻辑。

这两种方法都有缺点,即每个被加密的元组都会给新引入的bolt带来额外的消息(因此增加了网络流量)。对于第二种方法,您似乎只需要为之前失败的元组向bolt发送一个“ack”消息。但是,您不知道哪些元组失败了,哪些没有失败。如果希望消除这种网络开销,可以在kafkaspout中引入第二个hashmap,用于缓冲失败消息的ID。因此,只有当一个失败的元组被成功地重播时,您才能发送一个“ACK”消息。当然,第三种方法使得要实现的逻辑更加复杂。

如果不对kafkaspout进行某种程度的修改,我看不到您的问题的解决方案。我个人会修补kafkaspout或者使用第三种方法,在kafkaspout子类和bolt中使用hashmap(因为与前两种解决方案相比,它占用的内存很少,也不会给网络带来很多额外的负载)。

 类似资料:
  • 我有一个Java计算问题,其中我得到了一个整数数组: 例如: 3-2-10 0 1 我应该计算出可以从这些整数形成的最小整数和最大三元组是什么。(在这种情况下,最小值=-30,最大值=60) 我最初认为最大值总是正的,最小值总是负的。 因此, 我最初的算法是: 扫描数组并取出其中的3个最大元素,存储到数组中。 同时,取出里面的3个最小的元素,存储到另一个数组中。 通过不等式,我们可以推断如下: v

  • 本节通过求数组的最大和最小值来提高初学者对数组的一些基本应用。 程序运行结果如下: 最高成绩:100 最低成绩:67 将变量 min 与 max 初值设成数组的第 1 个元素后,再逐一与数组中的各元素相比。比 min 小,就将该元索的值指定给 min 存放,使 min 的内容保持最小。同样,当该元素比 max 大时,就将该元素的值指定给 max 存放,使 max 的内容保持最大。for 循环执行完

  • O(n^2)算法简单。有没有人对此有更好的算法?

  • 问题内容: 我遇到一个问题,指出 考虑以下与学生有关的关系模式 数据库:学生( rollno ,姓名,地址) 报名( rollno,courseno , coursename ) 主键用下划线显示。“学生”和“注册”表中的元组数分别为120和8。(Student * Enroll)中可以出现的元组的最大和最小数量是多少,其中“ *”表示自然连接? 我已经看到了在互联网上几种解决方案,像这样 或本

  • 有没有一种方法可以在小于O(n^2)的时间内做到这一点。 O(nlogn)还是O(n)?