在我的拓扑中,当元组从spout转移到bolt或从bolt转移到bolt时,我看到大约1-2 ms的延迟。我使用纳秒时间戳来计算延迟,因为整个拓扑运行在单个Worker中。拓扑是在集群中运行的,集群运行在具有生产能力的硬件中。
根据我的理解,在这种情况下,元组不需要序列化/反序列化,因为所有东西都在单个JVM中。我已经将大多数喷流和螺栓的并行性提示设置为5,并且喷流仅以每秒100的速率产生事件。我不认为高延迟是由于事件的排队,因为我没有看到延迟随着时间的推移而增加。也没有增加记忆。日志级别设置为错误。CPU使用率在200%到300%的范围内。
是什么导致了这种延迟?我原以为只有几个我们的元组传输。
我假设您使用的是一个已发布的Storm版本,而不是2.0.0-Snapshot,因为队列实现在该版本中发生了变化。
我认为延迟很可能是因为Storm在将元组交付给消费者之前对元组进行了批处理。查看https://github.com/apache/storm/blob/v1.2.1/storm-core/src/jvm/org/apache/storm/utils/disruptorqueue.java#L247,并查看该文件中的Flusher类。当一个spout/bolt发布一个元组时,它被放入_currentbatch列表中。它会一直保持在那里,直到接收到足够多的元组,从而使批处理“足够大”(您可以查看_inputBatchSize变量,以确定这是什么时候),或者直到刷新器被触发(默认情况下每毫秒发生一次)。
因此,如果您使用基于JUnit的单元测试,是否建议您运行一个小型模拟拓扑(?)并测试该拓扑下的(或)的隐含契约?或者,是否可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它、调用、嘲弄等)?在这种情况下,被测类(螺栓/喷口)有哪些一般的测试点需要考虑? 其他开发人员在创建正确的单元测试方面做了什么? 我注意到有一个拓扑测试API(参见:https://github.com/x
我实现了一个从Kafka队列读取消息的heron拓扑。因此,我的拓扑有一个kafka喷口和一个计算从队列读取的消息数的bolt。
假设我在Storm集群中有2个喷口和3个螺栓,并且有两个工人节点。这些喷口和螺栓将在这些工人之间共享(例如第一个工人有1个喷口和2个螺栓,第二个工人有1个喷口和1个螺栓)还是每个工人有2个喷口和3个螺栓,最终在整个集群中有4个喷口和6个螺栓?
我对Apache Storm有一个奇怪的问题。我有一个Kafka连接到Kafka集群,里面有10条消息。 螺栓接收每条消息并正确处理,因为在Storm UI中,螺栓被列为“已确认”。然而,storm UI下面列出的喷口表示所有元组都失败了。 我相信这会导致喷口再次发出所有的信息。。。因此,我看到一个Storm螺栓打印出消息1-10,然后以相同的顺序一次又一次地打印出来。 我适当地调用了和方法,我只
编辑:我向Bolt添加了一个。ack()(这要求我使用一个丰富的Bolt而不是基本的Bolt)并且遇到了同样的问题--没有任何信息告诉我Bolt正在处理元组。 如果有关系的话,我会在EC2实例上的CentOS映像上运行这个。如有任何帮助,不胜感激。 查看生成的Storm worker日志,我看到这一行: 下面几行如下: 工作日志的其余部分没有显示螺栓处理的消息的日志/打印。我不明白为什么螺栓似乎没
我有一个非常简单的Storm螺栓,从Kafka喷口输入,应该只是写到标准输出。它扩展了BaseRichBolt。有关的两种方法是: