当前位置: 首页 > 知识库问答 >
问题:

Kafka正试图以“恢复模式”向经纪人发送消息

充鑫鹏
2023-03-14

我有以下设置

3个Kafka(v2.1.1)代理5个Zookeeper实例

Kafka代理具有以下配置:

      auto.create.topics.enable: 'false'
      default.replication.factor: 1
      delete.topic.enable: 'false'
      log.cleaner.threads: 1
      log.message.format.version: '2.1'
      log.retention.hours: 168
      num.partitions: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: '2'
      transaction.state.log.replication.factor: '3'
      zookeeper.connection.timeout.ms: 10000
      zookeeper.session.timeout.ms: 10000
      min.insync.replicas: '2'
      request.timeout.ms: 30000

生产者配置(使用Spring Kafka)大致如下:

...
acks: all
retries: Integer.MAX_VALUE
deployment.timeout.ms: 360000ms
enable.idempotence: true
...

这个配置我读如下:有三个Kafka代理,但一旦其中一个死了,它是罚款,如果只有至少两个复制和持久的数据发送ack回(=在同步副本)。如果失败,Kafka制作人会持续重试6分钟,但随后放弃。

这就是让我头疼的场景:

  • 所有Kafka和Zookeeper实例都已启动并处于活动状态

问题是

  • 为什么Kafka集群要等到死去的Broker回来?
  • 当生产者意识到代理没有响应时,为什么它不尝试连接另一个代理?
  • 线程完全卡住了6分钟,等待死去的Broker恢复,我怎么能告诉制作人宁愿尝试另一个Broker?
  • 我是错过了什么,还是有什么好的做法来避免这种情况?

共有1个答案

涂羽
2023-03-14

你有很多问题,我将尝试提供我们的经验,希望能对其中一些问题有所帮助。

在我的产品IBM IDR Replication中,我们必须为那些主题正在重新平衡或在集群中失去代理的客户提供健壮性信息。我们的一些测试的结果是,仅仅设置请求超时是不够的,因为在某些情况下,请求将决定不等待整个时间,而是几乎立即执行另一次重试。这会消耗配置的重试次数,即,在某些情况下会绕过超时时间。

因此,我们指示用户使用如下公式...

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.cdckafka.doc/tasks/robust.html

“若要调整环境的值,请根据以下公式调整Kafka producer属性retry.backoff.ms和retries:retry.backoff.ms*retries。”

因此,也许可以尝试利用重试和重试。退避。请注意,如果有多个正在进行中,则使用不带幂等性的重试可能会导致批写入无序。。。因此,根据您的业务逻辑进行相应的选择。

根据我们的经验,Kafka制作人写信给经纪人,经纪人是该主题的领导者,因此你必须等待新领导人当选。此时,如果重试过程仍在进行中,则生产者将透明地确定新的领导者并相应地写入数据。

 类似资料:
  • 我们有一个带有三个代理(节点ID 0、1、2)的kafka集群和一个带有三个节点的zookeeper设置。

  • 我正在写一个测试flink两步提交的案例,下面是概述。 正是曾经的kafka生产者。是mysql接收器扩展。是mysql接收器扩展,这个接收器偶尔会抛出一个exeption来模拟检查点失败。 当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但Kafka消费者会读取上次成功的偏移量,Kafka生产者会生成消息,即使他在检查点失败之前就这样做了。 在这种情况下,如何避免重复消息? 谢谢你的

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 我正在尝试仅为代理间kerberos配置Kafka代理。然而,由于它似乎也想通过Kerberos连接到Zookeeper,所以我似乎总是遇到错误。我目前还没有设置任何Zookeeper键。 我的Kafka代理 JAAS 配置如下: 服务器属性 我用上述配置得到的错误如下: 换句话说,我只想要经纪人到经纪人的 kerberos 和经纪人 - 动物园管理员的普通SASL_SSL。这可能吗?

  • 我有一个运行apache kafka 2.2.1的安全MSK集群。如果我在私有子网(amazon Linux2)中创建一个ec2实例并安装java和kafka,我可以执行以下操作来与kafka通信: 然后制作一个文件,如 并将参数中的文件传递给一些kafka cli命令。 我现在正试图在ECS容器中运行kafdrop,我需要传入这个文件。文档说明我可以为和传入一个base64编码版本(我想我可以省

  • 我试图用wifi接口从一台电脑上的Kafka制作人发送消息到另一台电脑上的Kafka经纪人,但消息不出现在Kafka经纪人的指定主题中。 我用华硕无线路由器连接了两台PC机,并禁用了PC机和路由器上的所有防火墙。两台PC都成功地ping了对方。当我转向有线连接时,它工作了,消息被摄取到kafka broker PC上的指定主题。 Kafka制片人: null Listeners=明文://:909