2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30]
2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60]
我的测试拓扑结构真的很简单,一个KafkaSpout和另一个计数器螺栓。当拓扑运行良好时,for和tuple之间的值为正数;当拓扑停止使用消息时,该值变为负值。所以我很好奇是什么原因导致为-2元组处理接收到的消息的问题,以及如何修复这个问题?
OS:Red Hat Enterprise Linux Server 7.0版(Maipo)
Kafka:0.10.0.0
Storm:1.0.1
在stom邮件列表的帮助下,我能够调整KafkaSpout并解决问题。以下设置对我有效。
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
我通过发送20k-50k批并在突发之间暂停1秒来进行测试。每条消息为2048字节。
我正在运行3个节点集群,我的拓扑有4个喷点,主题有64个分区。
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht
我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息
有人能帮我弄清楚这件事吗。 谢了!
我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。
我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1