当前位置: 首页 > 知识库问答 >
问题:

在Storm中用螺栓链固定的正确方法

支智志
2023-03-14

我只是想确定一下我知道Storm中的工作原理。我有一个喷口和两个螺栓链在一起。Spout向Bolt1发射元组,Bolt1又向Bolt2发射元组。我想要bolt2来处理从Spout发送的初始元组,但我不确定如何处理。

为了保证容错性(即:元组是重新生成的),我想在bolt2中ack由Spout发出的元组,以防它在进程中某个地方失败,这样它就可以重新生成。

 _collector.emit(new Values(queue.dequeue())
def execute(tuple: Tuple) {
 _collector.emit(tuple, new Values("stuff"))
}
def execute(tuple2: Tuple) {
 _collector.emit(tuple2, new Values("foo"))
}

此时,tuple2中的元组是从Bolt1发送的元组(其中包含字符串“stuff”的元组)。
因此,如果我在Bolt2中发送ack,这将ack来自Bolt1的元组,而不是从spout发送的元组。正确?

我怎样才能确认从喷口发送的元组?我是不是应该把最初的喷口放在所有其他喷口上,这样我就可以在最后一个插销里找回来,然后把它收起来?

我读了Nathan的教程,我得到的印象是,我可以在发出tuple2之后立即在Bolt1(从Spout)中接收到的元组。这将把新发出的tuple2链接到Spout发送的原始元组,因此当Bolt2对tuple2进行编译时,它实际上对来自Spout的原始元组进行编译。这是真的吗?

如果我在解释中遗漏了什么,请告诉我。

共有1个答案

曹普松
2023-03-14

对于那些感兴趣的人,我已经找到了一个解决方案,通过询问暴风小组。我需要的是在Spout中以以下方式发出元组(具有唯一的ID):

喷口:

 //ties in tuple to this UID
 _collector.emit(new Values(queue.dequeue(), *uniqueID*) 

那么Bolt1将只在它将元组发送到Bolt2之后对元组进行ack

 //emit first then ack
 _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
 _collector.ack(tuple) 
 _collector.ack(tuple) 

希望这有帮助。

 类似资料:
  • 我的拓扑看起来是这样的:Bolt A向Bolt B和C发出相同的元组,每个元组都将数据持久化到Cassandra。这些操作不是幂等的,并且包括对两个不同计数器列族的更新。我只对元组失败和在Cassandra的某些异常(不是读/写超时,只是QueryConsistency或Validation异常)中重播它感兴趣。问题是,如果bolt B失败,相同的元组将从spout重播,并再次发送到bolt C,

  • 我有一个非常简单的Storm螺栓,从Kafka喷口输入,应该只是写到标准输出。它扩展了BaseRichBolt。有关的两种方法是:

  • 我正在尝试测量拓扑中每个bolt的延迟。Storm给出的延迟数是不够的,因为我们想要计算百分位数。在我当前的设置中,我通过测量完成execute方法(包括发出调用)所需的时间来测量bolt的延迟。该方法的假设是,即使当前bolt实例和下一个bolt实例在拓扑结构中共享同一个执行器,收集器的emit也会立即返回,而不需要调用下一个bolt实例执行方法。

  • 因此,如果您使用基于JUnit的单元测试,是否建议您运行一个小型模拟拓扑(?)并测试该拓扑下的(或)的隐含契约?或者,是否可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它、调用、嘲弄等)?在这种情况下,被测类(螺栓/喷口)有哪些一般的测试点需要考虑? 其他开发人员在创建正确的单元测试方面做了什么? 我注意到有一个拓扑测试API(参见:https://github.com/x

  • 我有一个小拓扑。它有一个Kafka喷口,一个从喷口读数的螺栓(螺栓a)。螺栓A发射到两个螺栓(螺栓B和螺栓C)。我使用了字段分组。螺栓A发出两种不同类型的数据。一个用于螺栓B,另一个用于螺栓C。 我的问题是,我是否可以这样配置storm,使用于Bolt B的数据总是流向Bolt B的实例,而用于Bolt C的数据总是流向Bolt B的实例?目前,我正在检查螺栓中接收的数据,并跳过不需要的数据。

  • 在我的拓扑中,当元组从spout转移到bolt或从bolt转移到bolt时,我看到大约1-2 ms的延迟。我使用纳秒时间戳来计算延迟,因为整个拓扑运行在单个Worker中。拓扑是在集群中运行的,集群运行在具有生产能力的硬件中。 根据我的理解,在这种情况下,元组不需要序列化/反序列化,因为所有东西都在单个JVM中。我已经将大多数喷流和螺栓的并行性提示设置为5,并且喷流仅以每秒100的速率产生事件。我