当前位置: 首页 > 知识库问答 >
问题:

当Kafka服务器关闭几分钟时,Kafka制作人正在丢失消息

张逸清
2023-03-14

我已经编写了一个Java程序,它利用了Kafka库,我听说Kafka制作人有内部缓冲区来保存消息,以便稍后重试。所以我创建了具有重试属性的幂等Kafka Producer。

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    props.put("linger.ms", 1000);
    props.put("acks", "all");
    props.put("request.timeout.ms",60000);
    props.put("retries",3);
    props.put("retry.backoff.ms",1000);
    props.put("max.in.flight.requests.per.connection",1);
    props.put("enable.idempotence",true);

在运行程序之前,我会关闭Kafka服务器(只有一个代理)。当我运行程序时,我遇到了一个异常“60000毫秒后更新元数据失败”。但是,当我重新启动Kafka服务器时,它应该将数据推送到Kafka主题,因为我已经给出了重试属性。

请在这方面提供帮助。

谢谢Priyam Saluja

共有2个答案

苏法
2023-03-14

我发现了问题所在,每次Kafka Producer尝试生成消息时,它都会首先更新元数据(以检查Kafka集群中的前导和分区)。如果无法获取信息,则会抛出错误,说明“60000毫秒后更新元数据失败”。

第二部分是重试,如果消息因暂时性错误而失败,Kafka制作人将重试。

步嘉德
2023-03-14

Kafka客户端发送的第一个请求之一是获取元数据。请记住,客户端试图连接到引导服务器列表中的代理,但它可能想要发送的主题可能不是其中之一。例如,考虑有3个代理B01、B02、B03和引导服务器只是B01,但生产者想要将消息发送到以B02为主题的主题分区:生产者需要第一元数据请求来获取该信息,然后打开与B02的连接来发送消息。我猜重试机制在这一步之后开始发挥作用,因为生产者内部的批处理利用了已知分区及其所在位置。您应该检查重试工作是否在获取元数据步骤正确完成后关闭服务器,并且生产者知道谁是分区负责人。

 类似资料:
  • 鉴于以下情况: 我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart 然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,produ

  • Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?

  • 我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例: 代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子): [生产者客户端 Id=生产者-27902] 关闭 Kafka 生产者超时Millis = 10000 毫秒。 [创建者客户端 Id=创建者-27902] 使用超时关闭 Kafka

  • 我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?

  • 我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下: 然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。 然后我在一个singleton类中创建了KafkaProducer的单个实例。 现在什么时候 或者我们如何在关闭后重新连接到生产者。问题是如

  • 你好,我正在使用Spring云流编写一个Kafka消费者生产者。在我的消费者内部,我将数据保存到数据库中,如果数据库出现故障,我将手动退出应用程序。重新启动应用程序后,如果数据库仍然关闭,则应用程序将再次停止。现在,如果我第三次重新启动应用程序,中间间隔(两次失败)收到的消息丢失,kafka 消费者会获取最新消息,也会跳过我退出代码的消息。 入站和出站通道绑定器接口 服务等级- 1)生产者服务 2