我试图写一个有弹性的Kafka消费者。如果在侦听器方法中处理消息时发生异常,我想重试。我想对某些异常重试几次,对于某些异常,从不对某些异常重试。我已经阅读了有关“搜索到当前错误处理者”的Spring文档,但不是100%确定如何实现它。
我已经子类化了ExceptionClatefierRetryPolicy,并根据Listener方法中发生的异常返回适当的重试策略。
我已经创建了RetryTemplate,并在子类中使用我的自定义实现设置了它的RetryPolicy。
我已在Kafka容器上设置了retryTemplate。我已将错误处理程序设置为new SeekToCurrentHandler,并将有状态重试属性设置为true。
侦听器方法
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
processIncomingMessage(obj);
ack.acknowledge();
}
自定义重试策略类
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
重试模板和容器配置
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
侦听器属性:
问题:
> < li>
使用我当前的代码,当AlwaysRetryPolicy返回时,我能否实现MyRetryPolicy中定义的重试逻辑,而不导致使用者重新平衡?如果没有,请指引我正确的道路。
我的方法在使用错误处理程序和重试方面是否正确?
有状态重试与<code>SeekToCurrentErrorHandler</code>是正确的方法。
但是,如果您使用的是最新版本(2.2或更高版本),则错误处理程序将在10次尝试后放弃;您可以通过将最大故障
设置为-1(2.2)或回退
以长.MAX_VALUE
(2.3或更高)来使无穷大。
在RetryTemplate里面,执行excuter方法是重试还是失败是由RetryPolicy决定的,这也是一个RetryContext工厂.这个RetryTemplate有责任使用当前的策略创建一个RetryContext并且把它注入到RetryCallback在每一次尝试中。回调失败后RetryTemplate必须由RetryPolicy决定使其更新状态(存储在RetryContext中),
我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何
下面是execute()方法所做的事情
我试图测试使用自定义重试策略的重试模板。为了做到这一点,我使用以下示例: https://github.com/spring-projects/spring-retry/blob/master/src/test/java/org/springframework/retry/support/retrytemplatetests.java#l57 基本上,我的目标是在得到一些特定的http错误状态(例
我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。
当对骆驼使用Kafka组件时,从Kafka消费时有两种方法可以重试: null org.apache.kafka.clients.Consumer.internals.AbstractCoordinator[Consumer ClientID=Consumer-1,GroupID=2862121D-DDC9-4111-A96A-41BA376C0143]此成员将离开组,因为使用者轮询超时已过期。这