当前位置: 首页 > 知识库问答 >
问题:

如何在反应器kafka中重试失败的消费者记录

林浩漫
2023-03-14

我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。

我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板:

reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .map(ConsumerRecord::value)
                .flatMap(this::consumeWithRetry)
                .onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
                .retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
                .subscribe();

让我们考虑消耗方法如下

public Mono<Void> consume(MessageRecord message){
   return Mono.error(new RuntimeException("test retry"); //sample error scenario
}

我使用以下逻辑在失败时重试消耗方法。

public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(3)来包装consume方法,但这并没有达到目的。最后一次retryWhen仅用于重试Kafka再平衡订阅。

@simon-baslé@加里-罗素

共有1个答案

公孙慎之
2023-03-14

之前在重试时,我使用了以下方法:

public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

但它没有重试。添加Mono.defer后,上面的代码工作并添加所需的重试。

public Mono<Void> consumeWithRetry(MessageRecord message){
   return Mono.defer(()->consume(message))
          .retry(2);
}
 类似资料:
  • 我正在手动启动Zoomaster,然后是Kafka服务器,最后是Kafka-Rest服务器及其各自的属性文件。接下来,我正在tomcat上部署我的Spring Boot应用程序 在Tomcat日志跟踪中,我得到了错误org。springframework。上下文ApplicationContextException:无法启动bean的组织。springframework。Kafka。配置。inte

  • 我正处于探索Kafka0.8.1.1版本的初始阶段。 使用API进行触发器再平衡 将kafka配置为等待消费者活动一段时间,并假设它被不优雅地关闭,自动重新平衡。 这里的问题是,分配给死亡使用者的分区中的所有消息都保留在队列中,并且在重新平衡发生之前不会被处理。

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 我的处理来自的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题: 如何重试失败消息?我知道一些方法,但我不确定它们是否正确。 1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。 2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这

  • 我想描述以下场景:我有一个节点。js后端应用程序(它使用单线程事件循环)。这是系统的总体架构:Producer- 假设制作者向Kafka发送了一条消息,这条消息的目的是在数据库中进行某个查询并检索查询结果。但是,众所周知Kafka是一个异步系统。如果制作者向Kafka发送消息,它会得到一个响应,表明该消息已被Kafka经纪人接受。Kafka broker不会等到消费者轮询消息并处理它。 在这种情况

  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?