当前位置: 首页 > 知识库问答 >
问题:

如何为Kafka消费者实现始终重试策略/自定义重试策略,而不会导致组中的重新平衡

鲜于阳
2023-03-14

我试图写一个有弹性的Kafka消费者。如果在侦听器方法中处理消息时发生异常,我想重试。我想对某些异常重试几次,对于某些异常,从不对某些异常重试。我已经阅读了有关“搜索到当前错误处理者”的Spring文档,但不是100%确定如何实现它。

我已经子类化了ExceptionClatefierRetryPolicy,并根据Listener方法中发生的异常返回适当的重试策略。

我已经创建了RetryTemplate,并在子类中使用我的自定义实现设置了它的RetryPolicy。

我已在Kafka容器上设置了retryTemplate。我已将错误处理程序设置为new SeekToCurrentHandler,并将有状态重试属性设置为true。

侦听器方法

@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
   public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
      SomeAvroGeneratedClass obj = message.getPayLoad();
      processIncomingMessage(obj);
      ack.acknowledge();
   }

自定义重试策略类

 @Component
 public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
   {
      @PostConstruct
       public void init(){
             final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
             simpleRetryPolicy.setMaxAttempts(8);

     this.setExceptionClassifier( classifiable ->
           {
        // Always Retry when instanceOf   TransientDataAccessException
       if( classifiable.getCause() instanceof TransientDataAccessException)
            {
               return new AlwaysRetryPolicy();                                         

            }
      else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
           {
              return new NeverRetryPolicy();
           }
          else
            {
            return simpleRetryPolicy;
            }

      } );
 }}

重试模板和容器配置

@Configuration
public class RetryConfig{


 @Bean
 public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){

   RetryTemplate retryTemplate = new RetryTemplate();
   retryTemplate.setRetryPolicy(new MyRetryPolicy());
   FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
   fixedBackOffPolicy.setBackOffPeriod(2000l);
   retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

   factory.setRetryTemplate(retryTemplate);
   factory.setAckOnError(false);
   factory.setErrorHandler(new SeekToCurrentErrorHandler());
   factory.setStateFulRetry(true);
   factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
   );

  }
}

侦听器属性:

  1. AckMode=手动
  2. auto.offset.commit=false

问题:

> < li>

使用我当前的代码,当AlwaysRetryPolicy返回时,我能否实现MyRetryPolicy中定义的重试逻辑,而不导致使用者重新平衡?如果没有,请指引我正确的道路。

我的方法在使用错误处理程序和重试方面是否正确?

共有1个答案

堵宪
2023-03-14

有状态重试与<code>SeekToCurrentErrorHandler</code>是正确的方法。

但是,如果您使用的是最新版本(2.2或更高版本),则错误处理程序将在10次尝试后放弃;您可以通过将最大故障设置为-1(2.2)或回退长.MAX_VALUE(2.3或更高)来使无穷大。

 类似资料:
  • 在RetryTemplate里面,执行excuter方法是重试还是失败是由RetryPolicy决定的,这也是一个RetryContext工厂.这个RetryTemplate有责任使用当前的策略创建一个RetryContext并且把它注入到RetryCallback在每一次尝试中。回调失败后RetryTemplate必须由RetryPolicy决定使其更新状态(存储在RetryContext中),

  • 我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何

  • 下面是execute()方法所做的事情

  • 我试图测试使用自定义重试策略的重试模板。为了做到这一点,我使用以下示例: https://github.com/spring-projects/spring-retry/blob/master/src/test/java/org/springframework/retry/support/retrytemplatetests.java#l57 基本上,我的目标是在得到一些特定的http错误状态(例

  • 我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。

  • 当对骆驼使用Kafka组件时,从Kafka消费时有两种方法可以重试: null org.apache.kafka.clients.Consumer.internals.AbstractCoordinator[Consumer ClientID=Consumer-1,GroupID=2862121D-DDC9-4111-A96A-41BA376C0143]此成员将离开组,因为使用者轮询超时已过期。这