当前位置: 首页 > 知识库问答 >
问题:

Kafka Spout没有从broker读取补偿,只是从Zookeeper读取特定消息后

江煜
2023-03-14

我对阿帕奇Storm和Kafka有意见。KafkaSpout正常读取来自Kafka的消息,但在大约30,000条消息之后,失败的元组开始出现,博尔特没有收到任何消息。

我查看worker.log并看到,当拓扑启动时,它尝试从Zookeeper读取分区信息,然后在broker中读取分区信息,成功了,如您所见:偏移量9539

Read partition information from: /twitter_streaming_tweet_test/STREAMING_TWEET_WRITER_SPOUT/partition_2  --> {"partition":2,"offset":9539,"topology":{"name":"DATA_WRITER_TOPOLOGY","id":"DATA_WRITER_TOPOLOGY-67-1516077955"},"topic":"twitter_streaming_tweet_test","broker":{"port":9092,"host":"zoo1"}}
2018-01-16 17:06:39.732 TWLogger Thread-7-STREAMING_TWEET_WRITER_BOLT-executor[3 3] [INFO] Tweet ID 952850493570654209 was saved to database

Tweets正常保存,然后Kafka Spout尝试从Zookeeper读取分区信息,但找不到任何东西,因此没有处理元组,拓扑陷入卡住。任何人都可以帮我解决这个问题。非常感谢你。

共有1个答案

喻元龙
2023-03-14

你能检查你的max.spout.pending值吗?通常,如果它被设置为非常高的值,那么失败的元组最终会在一段时间后出现在storm stats中,因为如果max.spout.pending非常高,消息会超时。如果你能把喷口/螺栓的storm stats和max.spout.pending值放在一起,将有助于理解这个问题。

 类似资料:
  • 我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:

  • 问题内容: 我正在尝试使用php从文本文件中读取特定行。这是文本文件: 我如何使用php获取第二行的内容?这将返回第一行: ..但我需要第二个。 任何帮助将不胜感激 问题答案: 文件—将整个文件读入数组

  • 对于套接字服务器应用程序,我创建了一个PacketFragmenter,它读取数据包的长度(在数据包的第二个字节中),然后将数据包发送回管道。 下面是我写的代码: 我在测试中得到了这个堆栈: 但一切都像它应该的那样进行,我得到两个数据包在一行,但他们很好地拼接,下一个处理程序正在做他的工作。 所以我不知道我应该处理这个例外还是忽略它?或者我可以做一件简单的事情来修复它,我根本不是网络专家(一周前开

  • 我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup

  • 问题内容: 我对此有一个文件模拟:… 我想从存在“ HDK1001”字样的行开始阅读,并在世界“ HDK7564”字样处结束阅读 我尝试使用此代码,但无法执行限制 请帮帮我 问题答案: 试试这个代码。

  • 从代码(李克强)收到的错误: