当前位置: 首页 > 知识库问答 >
问题:

Kafka领导人选举导致Kafka流崩溃

巫马俊力
2023-03-14

我有一个Kafka Streams应用程序,使用3个代理和3个复制因子从Kafka集群进行消费和生产。除了消费者偏移主题(50个分区)之外,所有其他主题都只有一个分区。

当代理尝试首选副本选择时,Streams应用程序(运行在与代理完全不同的实例上)将失败,并出现错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Streams应用程序尝试成为分区的领导者是否正常,因为它在不属于Kafka集群的服务器上运行?

我可以通过以下方式复制这种行为:

  1. 杀死其中一个代理(然后另外2个接管所有分区的领导,按照预期,杀死的代理是他们的领导)
  2. 把被杀的经纪人带回来
  3. bin/kafka-preferred-replica-election.sh--zoo的localhost
  4. 触发首选副本领导人选举

我的问题似乎与这个报告的失败相似,所以我想知道这是否是一个新的Kafka Streamsbug。我的完整堆栈跟踪与报告失败中链接的要点完全相同(此处)。

另一个潜在的有趣细节是,在领导者选举期间,我在代理的< code>controller.log中得到这些消息:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

我最初认为这个连接错误是罪魁祸首,但在领导者选举导致 Streams 应用程序崩溃后,如果我重新启动 Streams 应用程序,它会正常工作,直到下一次选举,我根本没有接触经纪人。

所有服务器(3个Kafka代理和Streams应用程序)都在EC2实例上运行。

共有1个答案

贾飞章
2023-03-14

这个问题现在在0.10.2.1中得到修复。如果您不能选择它,请确保您在streams配置中设置了这两个参数,如下所示:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
 类似资料:
  • 我和Kafka一起工作已经一年了,没有自发的领导人更迭发生。但在过去的两周里,这种情况经常发生。Kafka登录: [2015-09-27 15:35:14,826]信息[broker 2上的ReplicaFetcherManager]已删除分区的提取器[myTopic](kafka.server.ReplicaFetcherManager) [2015-09-27 15:35:14,830]信息截

  • 我的Kafka版本是。集群中有两个代理,4个主题,每个主题有4个分区。 当我跑的时候 对于所有主题/分区,我认为Broker1是领导者。 如何平衡领导的负载? 例如,对于主题1和主题2,代理1作为领导者,而对于主题3和主题4,代理2作为领导者。

  • 我正在努力通过SSL移动所有Kafka流量。每个区域有两个集群。 使用Kafka 2.7.0版。 除一个集群外,所有区域和所有集群都可以通过SSL正常工作。 在其他工具中,我使用< code>kafkacat来探测集群。 当通过明文连接对此集群执行时,它会列出所有代理、主题和分区,并显示每个分区的领导者: 当通过 SSL 执行相同的命令时,会发现: 0经纪人 列出主题和分区,但没有标题 对该地区的

  • 我在一个视频教程中看到,当制作人发布消息时,Kafka Broker支持3种类型的确认。 0-开火并忘记 1-领导确认 2-确认所有经纪人 我正在使用Kafka的Java API发布消息。这是必须为每个使用服务器的代理设置的吗。每个经纪人的特定属性,还是必须由制作人设定?如果必须由制作人设置,请解释如何使用Java API设置。

  • 我试图用LWJGL编写一个opengl渲染器。为了打开窗户,我用的是GLFW。但是,当我调用glfwCreateWindow时,它会崩溃,出现以下错误: Java运行时环境检测到一个致命错误: 谢了!

  • 我将开始讲述我想要实现的目标。所以我的设置是: > 6 VM运行Ubuntu 14.04版本 - 在其中3个中,我已经设置了Kafka,在3中,我创建了动物园管理员实例。 我开始生产和消费,一切似乎都很好,没有问题。 现在我想使用kafka 0.9版本的SSL来保护设置。我只想在客户端和kafka代理之间设置SSL,以便它们能够安全通信。我遵循以下链接。 我所做的唯一更改是:我用kafka代理的I