当前位置: 首页 > 知识库问答 >
问题:

如何重试来自Kafka的失败消息?

韩彦君
2023-03-14

我的Spring Boot应用程序(消费者)处理来自Apache Kafka的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题:

如何重试失败消息?我知道一些方法,但我不确定它们是否正确。

1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。

2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这看起来很难。

3)其他东西(你的变体)

共有3个答案

万俟英锐
2023-03-14

您可以在您的消费者配置中进行以下更改:

>

  • 启用。汽车提交=错误

    RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(retryInterval);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount));
    
        return retryTemplate;
    }
    

    在您的kafkaListenerContainerFactory

    setretryTemplate(retryTemplate);
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    

    现在,在代码中,只要在异常发生时在使用者中抛出异常即可。这不会在发生异常时更新偏移量。它将在retryInterval时间后重试,最长时间为maxRetryCount

    如果您想忽略某些类型的异常,而不是重试,请创建一个异常映射,如下图所示,并将其传递到SimpleRetryPolicy()中,如下图所示:

    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
            exceptionMap.put(IllegalArgumentException.class, false);
            exceptionMap.put(TimeoutException.class, true);
    

    有关更多详细信息,请访问此链接:Kafka错误处理

  • 琴俊良
    2023-03-14

    使用SeekToCurInterrorHandler。它将重新定位偏移量以重播消息(默认情况下为10次,但可配置)。

    重试用尽后,它调用可以执行某些操作的“恢复者”,例如DeadLetterPublishingRecoverer。

    闽念
    2023-03-14

    如果您希望至少保证一次,一般模式如下:

    >

  • 禁用自动提交(将enable.auto.commit设置为false)
  • 消费信息
  • 对于每条消息:

    • 如果没有错误,则提交偏移量
    • 如果出现错误,请重试任意次数
    • 如果成功,请提交
    • 如果要放弃,请登录或发布到错误队列(用于分析或稍后重试)

    重复

  •  类似资料:
    • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

    • 我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(

    • 基本上,我的用例是在HttpOutboundGateway请求中出现401时重试http请求。该请求来自jms代理,进入集成流。 标头使用适当的http标头进行了丰富,然后我有一个建议,使用默认的简单策略重试请求,RequestHandlerAdvice方法的问题是,它将HandlerRecovery流中的异常消息默认为非HttpException类(MessageException),因此我无法

    • 有人能帮我弄清楚这件事吗。 谢了!

    • 我希望使用动态接受主题作为查询参数的成功/失败响应来响应RESTendpoint。在带有小型反应式消息传递的Quakus中,代码看起来就像下面用OutgoingKafkaRecordMetadata包装有效负载一样 即https://myendpoint/PublishToKafka?主题=myDynamicTopic 从Quarkus doco“如果endpoint没有返回CompletionS

    • 我对Kafka很陌生,对它有一些疑问。我已经配置了一个kafka消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。f、 e、。 我想配置不同的kafka监听器来消费不同类型的事件。我认为有两种方法可以做到这一点,比如使用字符串(json)格式的事件,转换成事件对象,在不同类型之间切换,执行业务逻辑,或者配置不同的kafka监听器工厂 因此,第一种方法不是 ,对于第二种方法,我需要创建许多