在我的侦听器中,在使用消息后,如果发生任何异常,我将抛出一个异常。如果它成功了,那么我承认。但是,即使抛出异常,偏移量也不会后退。i、 e重试没有按预期进行。错误事件不会再次出现。
此外,我发现我没有消费所有预期的消息。我做错什么了吗?
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
监听器类
@KafkaListener(topics = "${gpc.consumer.topic}")
public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
Acknowledgment acknowledgment) {
try {
String body = consumerRecord.value().get();
log.logInfo("Received message for " + GPCSourceName + ": " + body);
log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
+ "offset="+consumerRecord.offset());
if(body.contains("berlin+braze@gmail.com")) {
throw new RuntimeException("custom exception");
}
log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
acknowledgment.acknowledge();
}catch(Exception e) {
log.logError("Exception occurred in listenWithHeadersGPC: ", e);
throw e;
}
}
我正在做确认。承认()
在我的侦听器中,如果没有异常
编辑:SpringKafka版本:2.2.8
工厂。setErrorHandler(新的SeekTocurErrorHandler(3))
在这种配置下,你会看到3次重试,两次之间的延迟为零;最后记录下来。默认情况下,“已恢复”记录的偏移量不会提交。
发布这样的问题时;指出您使用的版本。
从2.3版开始,您可以使用
确认。nack(睡眠时间)
;您还可以在seekTocurInterrorHandler
上调用setCommitRecovered(true)
,恢复记录的偏移量将被提交(使用手动\u IMMEDIATE
)。
只有当异常被抛出到容器时,这一切才会发生;如果在侦听器中捕捉到异常,则不会看到重试。
如果您根本看不到重试,并且您的侦听器正在抛出异常,那么您的侦听器或配置有问题。显示所有配置和
@KafkaListener
代码。
我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }
我有如下代码 每当我通过将其设置为实体来保存到我的数据库时,它会添加5:30作为偏移量
我想开始最新的抵消,而不是为旧的价值所困扰。是否有可能重置该组的偏移量?
我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例
问题内容: 使用javascript,我知道我的用户时区为UTC +3。 现在,我想用此知识创建DateTime对象: 我得到的回应是: 我究竟做错了什么?我该如何解决? 问题答案: 这个怎么样…
我有一个LocalDateTime的实例。 我需要映射它XMLGregorianCalendar(这里使用JAXB ),在最终的XML中,我希望时间在XML文档中看起来像下面这样:2020-03-04T19:45:00.000 1:00 (1小时是UTC的偏移量)。 我尝试过使用DateTimeFormatter将LocalDateTime转换为字符串,然后将其映射到XMLGregorianCal