当前位置: 首页 > 知识库问答 >
问题:

Kafka制作人Timeout异常:即将到期的1条记录

党宇定
2023-03-14

我在用Kafka搭配Spring靴:

Kafka制作人班:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

假设我在主题我的测试主题中发送了10次1000条消息

10次中有8次我成功地在我的消费者中获取所有消息,但有时我会收到以下错误:

2017-10-05 07:24:11,[ERROR][my service-LoggingProducerListener-onError:76]发送key='null'和payload='{“deviceType”:“X”,“deviceKeys”:[{“apiKey”:“X-X-o”}],“devices…”主题为我的测试主题时引发异常

org。阿帕奇。Kafka。常见的错误。TimeoutException:自批创建加上延迟时间后,my-test-topic-4的1条记录因30024毫秒而过期


共有3个答案

邴墨竹
2023-03-14

我通过正确处理主机spring解决了这个问题。Kafka。引导服务器及其DNS。即使网络解析IP地址,它似乎也需要DNS。

云昊阳
2023-03-14

>

  • 错误的第一条线索是30024 ms已通过-配置spring。Kafka。制作人要求暂停。ms=30000相关。这30秒的等待是为了填满生产者端的缓冲区。

    当消息发布时,它会在生产者端得到缓冲,并等待30秒(见上面的配置)填满<代码>Spring。Kafka。制作人batch size=100000表示100KB,因此,如果消息接收负载较低,并且缓冲区在30秒内没有填充到100KB的更多消息,您可能会看到此消息。

    spring.kafka.producer.linger.ms=10用于摄入负载较高且生产者希望限制对Kafka代理的发送()调用。这是批处理准备就绪后(即缓冲区填充到100KB的批处理大小后)生产者在向代理发送消息之前等待的时间。

    解决方案:

    • 增加linger.ms以便在批处理就绪后保留消息更长时间。如果需要更多时间来填充批处理,请增加request.timeout.ms
    • 另一种方法:减少批处理大小,或增加request.timeout.ms,或两者兼而有之。

  • 苏季同
    2023-03-14

    有三种可能性:

    1. 增加request.timeout.ms-这是Kafka在缓冲区中等待整个批次准备就绪的时间。因此,在您的情况下,如果缓冲区中的消息少于100,000条,将发生超时。更多信息请参见:https://stackoverflow.com/a/34794261/2707179
    2. 减少批处理大小-与上一点相关,它将更频繁地发送批处理,但它们将包含更少的消息。
    3. 根据消息大小,您的网络可能无法赶上高负载?检查您的吞吐量是否不是瓶颈。
     类似资料:
    • 2018-04-19 15:12:57[kafka-producer-network-thread producer-1]错误O.s.K.s.LoggingProducerListener-向主题xxxxx-v1:org.apache.kafka.common.errors.TimeoutException:自批处理创建后已超过xxxxx-v1-3:60043毫秒的过期1条记录,有效负载=“{79

    • 创建了一个群集,其中有两个代理使用相同的动物园管理员,并试图为主题生成消息,其详细信息如下。 当生产者设置或-1时,,它应该接收代理(领导者和副本)的确认,但当一个代理在制作时手动关闭时,即使在acks=“all”有人能解释这种奇怪行为的原因时,对Kafka制作人也没有任何影响? 经纪人在9091,9092。 下面是Kafka制作人的源代码

    • 我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点

    • 我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K

    • 我有一个单节点,多(3)代理Zookeeper/Kafka设置。我使用的是Kafka0.10 Java客户端。 我写了以下simple remote(在不同于Kafka的服务器上)Producer(在代码中,我用MYIP替换了我的公共IP地址): 这3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,Broker.ID为0、1、2、listeners为plaintext://:9092、p

    • 在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。 我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。 同时,我还创建了两个分区。 但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在