当前位置: 首页 > 知识库问答 >
问题:

kafka new producer无法在其中一个代理关闭后更新元数据

盖昀
2023-03-14

timeoutException:60000 ms后更新元数据失败。

10分钟后,由于Broker2是新的领导者,我希望生产者发送数据给Broker2,但它继续失败,给出了上述异常。lastRefreshMs和lastSuccessfullRefreshMs仍然相同,尽管生产者的metadataExpireMs是30万。

我正在使用Kafka新的生产者方面的生产者实现。

配置参数。

{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}

用例:

1-启动BR1和BR2产生数据(领导者是BR1)

如果生产者发送最新的成功数据给BR1,然后所有经纪人都关闭了,生产者期望BR1重新站起来,尽管BR2是新的领导者。这是意料之中的行为吗?

共有1个答案

汪信鸥
2023-03-14

花了几个小时后,我弄清楚了Kafka在我情况下的行为。可能这是一个bug,也可能是由于隐藏的原因需要这样做,但实际上,如果我要做这样的实现,我不会这样做:)

当所有代理都关闭时,如果您只能启动一个代理,那么这个代理必须是最后关闭的代理才能成功生成消息。

假设你有5个经纪人;BR1、BR2、BR3、BR4和BR5。如果所有的代理都关闭了,并且最后一个死亡的代理是BR3(它是最后一个领导者),尽管您启动了所有的代理BR1、BR2、BR4和BR5,但除非启动BR3,否则这将没有任何意义。

 类似资料:
  • 我是KAFKA的新手,我知道这个问题在stack overflow上被问了很多次,但没有一个解决方案对我有效,所以我在这里再次问同样的问题,试试我的运气。我已经在Centos7 VM上下载并安装了KFKA。虚拟机在我的笔记本电脑上。当我从命令行运行KAFKA生产者和消费者时,它工作得很好。下一步,我想创建一个Java生产者,但它总是超时,并出现以下异常。 生产者的 Java 代码是: 对于引导服务

  • 当所有的经纪人都起来的时候,一切都是好的。但是,如果我先杀死(按开始顺序),代理消息会被发送到代理,但使用者不能接收到任何消息,消息不会丢失。启动该代理后,使用者立即接收消息。 关闭broker实例后的使用者日志: 再次启动丢失的代理后的使用者日志: 谢谢

  • 我试图将数据复制到公里数组中,但我有一个错误,说不能转换类型。我如何修复这个问题?

  • 我需要在另一个相关实体更新后对一个实体执行更新。 我有两个实体:和,关系为1:N。两者都有字段。状态取决于所有子状态字段。因此,如果更新了一个,我需要重新计算的新状态并持久化/更新它。 我实现了一个监听器: 监听器在中进行了注释,并且正在正确调用它。但是,在流程完成后,仍然保持旧状态,即使使用正确的新状态调用。

  • 我有一个用户类,每个用户实例都有一个配置文件。个人资料有一个avatarImage属性,我想更新它。我可以在运行时更新用户的字段,但不能更新该用户配置文件的字段。我正在使用springsecurity获取当前用户,我想更新他的avatarImage,但在运行时我得到了一个SQLException。 用户域类: } 配置文件域类: 用户控制器: 用户服务: 错误/堆栈跟踪: 我花了两天时间试图解决这

  • 问题内容: 有人知道如何从其他活动中关闭一个活动吗?例如:我有3个活动(活动A,B和C),并且可以从活动C中关闭活动A。我的活动结构是活动A->活动B->活动C如何从中关闭活动A活动C? 我曾尝试此代码: 但是该代码只能关闭活动B的活动A,而不能直接关闭活动C的活动A。 有谁知道直接从其他活动关闭活动?谢谢.. 问题答案: