我将sping-boot(2.1.6.RELEASE)与sping-kafka(2.2.7.RELEASE)一起使用,并且我使用KafkaTemplate向我的kafka集群发送消息。但是有时(通常是当我重新启动kafka代理或进行重新平衡时),我在发送消息时会看到这样的错误:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
由于默认的Kafka生产者配置,我期望发送失败重试,但他们没有。默认Kafka生成器配置:
retries: 2147483647 (https://kafka.apache.org/documentation/#retries)
acks: 1 (https://kafka.apache.org/documentation/#acks)
我的配置是这样的:
@Bean
public Map<String, Object> producerConfigs()
{
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
ProducerFactory<Long, String> producerFactory)
{
KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
return kafkaTemplate;
}
我发出这样的信息:
kafkaTemplate.send(topicName, key, body);
我在互联网上搜索了所有的人,每个人都说这种带有重试和acks的配置必须有效,但它没有。我错过了什么?
谢啦
经过一段时间的调试,我找到了解决方案:
props.put(ProducerConfig.ACKS_CONFIG, "all");
有关此属性的更多信息:https://kafka.apache.org/documentation/#acks
非常好的博客,展示了在kafka中可能丢失消息的不同场景:
旁注-根据这个答案,我发现如果您不想在关机时丢失消息,最好使用这个:
@PreDestroy
public void flush()
{
kafkaTemplate.flush();
}
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
鉴于以下情况: 我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart 然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,produ
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,
我正在使用Docker启动一个kafka代理集群(例如,5个代理,每个容器一个代理)。Kafka版本2.12-0.11.0.0,动物园管理员3.4.10。 场景: null > 在独立模式下启动Zookeeper,然后启动kafka 创建主题 null 检查邮件 消息被累犯 null null server.properties(broker.id唯一,broker_ip:broker_port对