当前位置: 首页 > 知识库问答 >
问题:

Apache storm kafka只从一个主题的一半分区中读取数据

松波
2023-03-14

我们的生产Storm集群出现了一个我们无法解决的问题。

在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。

我们更改了使用者组 ID,并将输出配置起始时间设置为偏移量请求。它仍然只连接到相同的20个分区。我们已经查看了节点 /

我们已经验证了消息正在被发布到所有40个分区。

Kafka版本是0.9.0.1,Storm版本是1.1.0。

关于如何调试或在哪里查看的任何提示都将不胜感激。我有没有提到这正在生产中发生?我有没有提到它是一周前开始的,我们今天早上才注意到?:(

附加信息:我们在Kafka状态更改日志中发现了一些错误(分区9是受影响的分区之一,日志中的时间戳看起来与问题开始的时间有关)

kafka.common.NoReplicaOnlineException: No replica for partition 
[transcription-results,9] is alive. Live brokers are: [Set()], Assigned replicas are: [List(1, 4, 0)]
[2018-03-14 03:11:40,863] TRACE Controller 0 epoch 44 changed state of replica 1 for partition [transcription-results,9] from OnlineReplica to OfflineReplica (state.change.logger)
[2018-03-14 03:11:41,141] TRACE Controller 0 epoch 44 sending become-follower LeaderAndIsr request (Leader:-1,ISR:0,4,LeaderEpoch:442,ControllerEpoch:44) to broker 4 for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,145] TRACE Controller 0 epoch 44 sending become-follower LeaderAndIsr request (Leader:-1,ISR:0,4,LeaderEpoch:442,ControllerEpoch:44) to broker 0 for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,208] TRACE Controller 0 epoch 44 changed state of replica 4 for partition [transcription-results,9] from OnlineReplica to OnlineReplica (state.change.logger)
[2018-03-14 03:11:41,218] TRACE Controller 0 epoch 44 changed state of replica 1 for partition [transcription-results,9] from OfflineReplica to OnlineReplica (state.change.logger)
[2018-03-14 03:11:41,226] TRACE Controller 0 epoch 44 sending become-follower LeaderAndIsr request (Leader:-1,ISR:0,4,LeaderEpoch:442,ControllerEpoch:44) to broker 4 for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,230] TRACE Controller 0 epoch 44 sending become-follower LeaderAndIsr request (Leader:-1,ISR:0,4,LeaderEpoch:442,ControllerEpoch:44) to broker 1 for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,450] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:Leader:-1,ISR:0,4,LeaderEpoch:442,ControllerEpoch:44),ReplicationFactor:3),AllReplicas:1,4,0) correlation id 158 from controller 0 epoch 44 for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,454] TRACE Broker 0 handling LeaderAndIsr request correlationId 158 from controller 0 epoch 44 starting the become-follower transition for partition [transcription-results,9] (state.change.logger)
[2018-03-14 03:11:41,455] ERROR Broker 0 received LeaderAndIsrRequest with correlation id 158 from controller 0 epoch 44 for partition [transcription-results,9] but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
//... removed some TRACE statements here
[2018-03-14 03:11:41,908] WARN Broker 0 ignoring LeaderAndIsr request from controller 1 with correlation id 1 epoch 47 for partition [transcription-results,9] since its associated leader epoch 441 is old. Current leader epoch is 441 (state.change.logger)
[2018-03-14 03:11:41,982] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:Leader:1,ISR:0,1,4,LeaderEpoch:441,ControllerEpoch:44),ReplicationFactor:3),AllReplicas:1,4,0) for partition [transcription-results,9] in response to UpdateMetadata request sent by controller 1 epoch 47 with correlation id 2 (state.change.logger)
[2018-03-22 14:43:36,098] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:Leader:-1,ISR:,LeaderEpoch:444,ControllerEpoch:47),ReplicationFactor:3),AllReplicas:1,4,0) correlation id 679 from controller 1 epoch 47 for partition [transcription-results,9] (state.change.logger)

可能由此错误引起:https://issues.apache.org/jira/browse/KAFKA-3963

但我们如何才能从中恢复呢?


共有1个答案

柳业
2023-03-14

我首先在/brokers/topics下查看Kafka的Zookeeper,以验证所有分区都已列出。斯托姆-Kafka就是从那里读取分区的。

 类似资料:
  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下

  • 我们使用的是 Kafka 2.5.1 版本集群。最近注意到其中一个主题分区数据大小不均匀。与其余分区相比,一个特定分区的大小增加了 300%。这在群集中造成了不均衡的磁盘利用率。 已验证使用者滞后,看起来像其他分区一样正常 此外,我们使用默认分区程序和设置为默认值的“metadata.max.age.ms”配置,即 300000ms(5 分钟) 我们是如何使分区数据均匀分布的?

  • 我在Kafka Topic内部有500万条消息。 我必须加入具有相同分区密钥的消息作为单个消息的一部分,并发送给消费者主题[例如:对于密钥1234-Messge1,消费者应该收到单个消息而不是100万消息] Kafka端是否有可用的Kafka API,使用它我可以读取组中具有相同Partition键的所有消息,而不是像传统的spring boot Kafka Listener那样一次读取单个消息。

  • 有一个16个分区的Kafka主题 使用给定的消费者组名称,我们目前正在启动单个消费者来阅读该主题。 > 单个消费者是否从该主题的(仅)读取?如果带有消息为空,消费者是否从下一个分区开始读取(...等等)? 我们可以选择启动多个消费者(使用相同的消费者组名称)来读取相同的主题(有16个分区)。为了并行读取多个分区,可以维护多少消费者?

  • 问题内容: 如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。 提及一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count- through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/ge