当前位置: 首页 > 知识库问答 >
问题:

Storm acker和保证消息处理的混淆

朱昊乾
2023-03-14

现在我正在学习Storm的保证消息处理,对这一部分的一些概念感到困惑。

为了保证喷口发出的信息得到充分处理,Storm使用acker来实现这一点。每次喷口发出一个元组时,acker将分配初始化为0的“ack val”来存储元组树的状态。每次该元组的下游螺栓发出新元组或确认一个“旧”元组时,元组ID将与“ack val”异或。acker只需要检查“ack val”是否为0,就可以知道元组已被完全处理。让我们看看下面的代码:

public class WordReader implements IRichSpout {
    ... ...
while((str = reader.readLine()) != null){
    this.collector.emit(new Values(str), str);
    ... ...
}

上面的代码段是“Storm入门”教程中的一个字计数程序。在emit方法中,第二个参数“str”是messageId。我被这个参数弄糊涂了:1)据我所知,每次发出元组(即消息)时,无论是在喷口中还是在螺栓中,Storm都应该负责为该消息分配64位messageId。对吗?或者这里的“str”只是这条消息的一个人类可读的别名?2) 不管1)的答案是什么,这里的“str”在两条不同的消息中都是相同的单词,因为在文本文件中应该有许多重复的单词。如果这是真的,那么Storm如何区分不同的消息?这个参数是什么意思?3) 在一些代码段中,我看到一些喷口使用以下代码在喷口发出方法中设置消息Id:

public class RandomIntegerSpout extends BaseRichSpout {
    private long msgId = 0;
    collector.emit(new Values(..., ++msgId), msgId);
}

这更接近于我所认为的:不同消息的消息ID应该完全不同。但对于这段代码,另一个困惑是:不同执行者之间的私有字段“msgId”会发生什么?因为每个执行器都有自己的msgId初始化为0,所以不同执行器中的消息将从0、1、2等中命名。那么Storm如何区分这些信息呢?

我是Storm的新手,所以也许这些问题很幼稚。希望有人能帮我弄清楚。谢谢

共有1个答案

郭俊人
2023-03-14

关于消息ID是通用的:在内部,它可能是一个64位的值,但这个64位的值是作为散列计算的,散列来自Spout中emit()中提供的msgID对象。因此,您可以将任何对象作为消息ID(两个对象散列为相同值的概率接近于零)。

关于使用str:我认为在本例中,str包含一行(而不是一个单词),并且文档不太可能包含完全相同的行两次(如果没有空行,可能会有很多空行)。

关于计数器作为消息id:您的观察完全正确——如果多个喷口并行运行,这将导致消息id冲突,并将破坏容错性。

如果您想修复计数器方法,每个计数器应该以不同的方式初始化(最好是从1...#Spout任务)。您可以为此使用taskID(它是唯一的,可以通过Spout.open()中提供的TopologyContext访问)。基本上,您可以获取所有并行spout任务的所有taskID,对它们进行排序,并为每个spout任务分配其排序号。此外,您需要通过“并行喷口的数量”而不是1来递增。

 类似资料:
  • Storm 通过 Trident 对保证消息处理提供了不同的 level ,包括 best effort(尽力而为),at least once (至少一次)和exactly once(至少一次). 这张页面描述如何保证至少处理一次. What does it mean for a message to be "fully processed"?(一条信息被完全处理是什么意思) 一个 tuple

  • 我正在尝试运行WordCount示例以保证消息处理。 有一个喷口 WSpout-使用msgID发出随机句子 还有两个螺栓 > SplitSentence-在单词中拆分句子并发出锚定 字数计数-打印字数计数。 我想用下面的代码实现的是,当一个句子中的所有单词都计算完毕时。必须确认与该句子对应的喷口。 我向收藏家表示感谢。仅在最后一次bolt WordCount时确认(元组)。奇怪的是ack()的in

  • 在下面的示例中,我有两个正在处理来自kafka的消息的服务实例,但我希望确保只在之后处理。 显然,通过将一个实例配置为仅从特定分区消费,可以很容易地解决这种情况,该分区将存储带有公共标识符的消息: 现在顺序得到了保证,将永远不会在之前处理。 但是,我在想这个问题是否可以用另一种方式来解决,直接在代码中而不是依赖基础设施?这看起来可能是微服务架构中的一个标准问题,但我不确定哪种方法是解决它的首选方法

  • 流处理和传统消息处理的基本区别是什么?正如人们所说,kafka是流处理的好选择,但本质上,kafka是一个类似于ActivMQ、RabbitMQ等的消息传递框架。 为什么我们通常不说ActiveMQ也适合流处理呢。 消费者消费消息的速度是否决定了它是否是流?

  • 通常,我希望将消息发送到另一个路由来处理它,但我不希望为后续步骤修改该消息。做这件事最好的方法是什么? 我发现的另一个选择是使用异步sedaendpoint,它将原始消息返回给生产者并处理副本,但这会引入异步行为,而异步行为可能并不总是可取的。 看来一定有更好的办法?

  • 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费和幂等性问题。不能少,就是说这数据别搞丢了。那这个问题你必须得考虑一下。 如果说你这个是用 MQ 来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢。 面试题剖