我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。
Listener Config:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency")));
// factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setBatchListener(true);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
env.getProperty("spring.kafka.consumer.enable-auto-commit"));
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
env.getProperty("spring.kafka.consumer.auto-commit-interval"));
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout"));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
return propsMap;
}
Listener Class:
@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> payloadList) throws Exception {
if (payloadList.size() > 0)
//Post to the service
}
Kafka Error Handler:
public class KafkaErrorHandler implements BatchErrorHandler {
private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
LOGGER.info("Exception occured while processing::" + thrownException.getMessage());
}
}
如何处理Kafka listener,以便在处理一批记录时发生问题,我不会丢失数据。
使用Apache Kafka,我们永远不会丢失数据。分区日志中确实存在一个偏移量,可以搜索到任意位置。
另一方面,当我们使用分区中的记录时,不需要提交它们的偏移量——当前的使用者将状态保存在内存中。我们只需要为同一群体中的其他新消费者做出promise,而当前消费者已经死亡。与错误无关,当前使用者总是继续轮询其当前内存偏移量后面的新数据。
因此,要在同一消费者中重新处理相同的数据,我们必须使用seek
操作将消费者移回所需位置。这就是为什么Spring Kafka引入了seekToCurInterrorHandler
:
这允许实现查找所有未处理的主题/分区,以便在下次轮询时检索当前记录(以及剩余的其他记录)。seekToCurInterrorHandler
就是这样做的。
https://docs.spring.io/spring-kafka/reference/htmlsingle/#_seek_to_current_container_error_handlers
我有一个kafkalistener,可以一次监听一批消息,如下所示 我的问题是,有没有一种方法可以监听多批消息并只提交一次。例如,如果我在Kafka主题中有1000条消息,我希望以10批的形式一次听100条消息,并在处理10批消息后提交偏移量。
我有一个spring kafka消费者,它读取记录并将其交给缓存。计划的任务将定期清除缓存中的记录。我只想在批成功保存到数据库中后更新提交偏移量。我尝试将Acknowledgement对象传递给缓存服务以调用acknowledge方法,如下所示。 确认模式设置如下: 自动提交是错误的: 即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么? 我正在使用spring-
请帮助我决定这是否是正确的方法。我使用的不是Spring kafka 2.x,而是1.2.x。另外,目前我已经将auto.commit.offset属性设置为false,并将ackMode设置为manual。
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数
我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }