当前位置: 首页 > 知识库问答 >
问题:

处理消息时重试次数有限的Kafka使用者

隆钊
2023-03-14

我很难在Kafka主题的消费者中找到处理异常的简单模式。场景如下:在消费者中,我调用一个外部服务。如果服务不可用,我想重试几次,然后停止消费。

最简单的模式似乎是一种处理它的阻塞同步方式,在Java中如下所示:

ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
    boolean processed=false;
    int count=0;
    while (!processed) {
        try {
            callService(..); 
        } catch (Exception e) {
            if (count++ < 3) {
                Thread.sleep(5000);
                continue;
            } else throw new RuntimeException();
        }
    }
}

但是,我觉得必须有一种更简单的方法(不使用第三方库),并且避免阻塞线程。

这似乎是我们想要的一种常见的东西,但我找不到一个简单的例子来说明这种模式。

共有1个答案

胡志
2023-03-14

Kafaka提供的这种开箱即用的再审机制并不存在。使用RabbitMQ的经验,其中MQ提供重试交换。这些交换在RabbitMQ中称为死信交换

https://www.rabbitmq.com/dlx.html

你可以在Kafaka的例子中应用同样的模式。

void consumeMainTopicWithPostponedRetry() {
    while (true) {
        Message message = takeNextMessage("main_topic");
        try {
            process(message);
        } catch (Exception ex) {
            publishTo("retry_topic");
            LOGGER.warn("Message processing failure. Will try once again in the future.", ex);
        }
    }
}
void consumeRetryTopic() {
    while (true) {
        Message message = takeNextMessage("retry_topic");
        try {
            process(message);
            waitSomeLongerTime();
        } catch (Exception ex) {
            publishTo("failed_topic");
            LOGGER.warn("Message processing failure. Will skip it.", ex);
        }
    }
}

以上的策略和例子摘自下面的链接。所有的功劳都要归功于博文的主人。https://blog.pragmatists.com/retring-consumer-architecture-in-the-apache-kafka-939AC4CB851A

对于以上所做的无阻塞方式,可以通过阅读整篇博文来理解。希望这有帮助。

 类似资料:
  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我正在使用Kafka活页夹的Spring Cloud Stream。它工作得很好,但客户端接收到重复的消息。已经尝试了所有Kafka消费属性,但没有结果。 在我的应用程序示例中检查2个类-Aggregate Application和EventFilterApplication。如果我运行EventFilterApplication-只有1条消息,如果是Aggregate Application-2

  • 在本公司的最后一个项目中:客户提出身份验证等请求,应用程序第一层得到客户请求并在Kafka上生成消息,核心服务消费该消息后向银行服务提出rest请求,得到响应后在Kafka上生成响应消息,应用程序第一层将消息传递给客户。是真的Kafka用例,还是去掉第一层和Kafka,在客户端和核心之间使用rest服务更好。谢谢