当前位置: 首页 > 知识库问答 >
问题:

制作人丢失了一些关于Kafka重启的消息

端木淇
2023-03-14

Kafka客户:0.11.0.0-cp1Kafka经纪人:

在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置:


val acksConfig: String = "all",
val retriesConfig: Int = Int.MAX_VALUE,
val retriesBackOffConfig: Int = 1000,
val batchSize: Int = 32768,
val lingerTime: Int = 1,
val maxBlockTime: Int = Int.MAX_VALUE,
val requestTimeOut: Int = 420000,
val bufferMemory: Int = 33_554_432,
val compressionType: String = "gzip",
val keySerializer: Class<StringSerializer> = StringSerializer::class.java,
val valueSerializer: Class<ByteArraySerializer> = ByteArraySerializer::class.java

我在日志中看到了这些例外

2019-03-19 17:30:59,224 [org.apache.kafka.clients.producer.internals.Sender] [kafka-producer-network-thread | producer-1] (Sender.java:511) WARN  org.apache.kafka.clients.producer.internals.Sender  - Got error produce response with correlation id 1105790 on topic-partition catapult_on_entitlement_updates_prod-67, retrying (2147483643 attempts left). Error: NOT_LEADER_FOR_PARTITION

但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?

共有1个答案

轩辕实
2023-03-14

需要注意两件事:

  1. 您正在制作的主题的复制系数是多少?所需的min.insync数是多少。复制品
  2. 你说的“制作人丢失了一些信息”是什么意思。如果制作人无法成功制作到#min.insync,则为制作人。它将抛出异常并失败(对于同步生产)。如果出现故障(同步或异步生产),则由生产商/客户机重试
 类似资料:
  • 我正在使用Docker启动一个kafka代理集群(例如,5个代理,每个容器一个代理)。Kafka版本2.12-0.11.0.0,动物园管理员3.4.10。 场景: null > 在独立模式下启动Zookeeper,然后启动kafka 创建主题 null 检查邮件 消息被累犯 null null server.properties(broker.id唯一,broker_ip:broker_port对

  • 我已经编写了一个Java程序,它利用了Kafka库,我听说Kafka制作人有内部缓冲区来保存消息,以便稍后重试。所以我创建了具有重试属性的幂等Kafka Producer。 在运行程序之前,我会关闭Kafka服务器(只有一个代理)。当我运行程序时,我遇到了一个异常“60000毫秒后更新元数据失败”。但是,当我重新启动Kafka服务器时,它应该将数据推送到Kafka主题,因为我已经给出了重试属性。

  • 我在单节点中运行kafka,我想看到kafka生产者行为,当我关闭我的kafka代理,然后我在几秒钟内重新启动我的代理,所以我创建Spring启动程序,我可能会发送1000个客户JSON对象并打印每次发送的JSON对象的偏移量。我的应用程序工作正常,但是当我关闭我的kafka代理并在几秒钟后我重新启动我的代理时,我的生产者返回以正常发送来自最新偏移量的对象。对于我的例子,当偏移量= 983在控制台

  • 我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?

  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 本文向大家介绍Kafka中的消息是否会丢失和重复消费?相关面试题,主要包含被问及Kafka中的消息是否会丢失和重复消费?时的应答技巧和注意事项,需要的朋友参考一下 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。 1、消息发送 0---表示不进行消息接收是否成功的确认; 1---表示当Leader接收成功时确认; -1---表示Leader和Follower都接收成功