当前位置: 首页 > 知识库问答 >
问题:

代理关闭时,Kafka producer正在丢失消息

韩高峯
2023-03-14

鉴于以下情况:

我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart

然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,producer可以重新连接到broker并继续生成消息,但是,在kafka broker宕机期间生成的所有消息都丢失了。当检测到健康的Kafka经纪人时,制作人不会重播它们。

我怎样才能防止这种情况?我希望kafka producer在检测到kafka broker重新联机时重播这些消息。这是我的制作人配置:

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

共有2个答案

严扬
2023-03-14

因为您只运行一个代理,所以当您的代理关闭时,恐怕您将无法存储消息。

然而,奇怪的是,当你让你的经纪人下台时,你没有得到任何异常/警告/错误。

我预计会出现“更新元数据失败”或“消息过期”错误,因为当生产者将消息发送到引导过程中提到的代理时。servers属性,它首先与zookeeper检查活动控制器(或引线)和分区。因此,在您的情况下,因为您在独立模式下运行Kafka,并且当代理关闭时,制作人不应该收到领导信息并出错。

请检查以下属性设置为什么:

request.timeout.ms
max.block.ms

并利用这些价值观(减少,可能是)?然后检查结果?

您可能想要尝试的另一个选项是以同步方式向Kafka发送消息(在消息被接收之前阻止发送()方法),这里有一个可能有帮助的代码片段(取自此留档参考):

如果你想模拟一个简单的阻塞调用,你可以立即调用get()方法:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

在这种情况下,如果消息因任何原因未成功发送,kafka应引发异常。

我希望这有帮助。

田兴旺
2023-03-14

Kafka Producer库内置了重试机制,但默认情况下已关闭。将retriesProducer-config更改为大于0(默认值)的值以启用它。您还应该尝试重试。退避。ms请求。timetout。ms以自定义生产者重试。

示例Kafka生产者配置与启用重试:

retries=2147483647         //Integer.MAX_VALUE 
retry.backoff.ms=1000
request.timeout.ms=305000  //5 minutes
max.block.ms=2147483647    //Integer.MAX_VALUE 

您可以在Apache Kafka留档中找到有关这些属性的更多信息。

 类似资料:
  • 我已经编写了一个Java程序,它利用了Kafka库,我听说Kafka制作人有内部缓冲区来保存消息,以便稍后重试。所以我创建了具有重试属性的幂等Kafka Producer。 在运行程序之前,我会关闭Kafka服务器(只有一个代理)。当我运行程序时,我遇到了一个异常“60000毫秒后更新元数据失败”。但是,当我重新启动Kafka服务器时,它应该将数据推送到Kafka主题,因为我已经给出了重试属性。

  • 我将sping-boot(2.1.6.RELEASE)与sping-kafka(2.2.7.RELEASE)一起使用,并且我使用KafkaTemplate向我的kafka集群发送消息。但是有时(通常是当我重新启动kafka代理或进行重新平衡时),我在发送消息时会看到这样的错误: 由于默认的Kafka生产者配置,我期望发送失败重试,但他们没有。默认Kafka生成器配置: 我的配置是这样的: 我发出这

  • 寻找设计我的Kafka消费者的最佳方法。基本上,我想看看什么是避免数据丢失的最佳方法,以防在处理消息期间出现任何异常/错误。 我的用例如下。 a)我使用SERVICE来处理消息的原因是 - 将来我计划编写一个ERROR处理器应用程序,该应用程序将在一天结束时运行,它将尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。 b)我想确保没有消息丢失,所以我会将消息保存

  • 如何找到我的文件的位置? 我正在运行一个UbuntuLinux服务器从亚马逊网络服务EC2(弹性计算云),我找不到我的Apache配置。

  • 我是java新手。我正在尝试编写一个java代理。在使用演示应用程序运行时,它会失败,并显示以下消息: 下面是代码和pom文件:https://gist.github.com/proywm/f27b72d01aa66afc7f0f8c098e33e914 你能告诉我如何解决这个问题吗?

  • 全部的 这里有一条简单的路线: JsonValidator是一个简单的Javabean,我在其中扩展了处理器。在这里,我想确保在我继续使用Jackson散集调用以将JSON散集到我的POJO之前,所有必需的字段都被传入。 我现在在豆子里做的只是一行: 只需调用exchange.getIn().getBody(String.class),就会导致路由中的下一个(解组)步骤抛出一个错误,表示没有要解组