当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka-显示带有@重试的自定义错误消息

耿联
2023-03-14

我目前正在使用Spring Kafka来使用topic中的消息以及Spring的@Retry。因此,基本上,我正在尝试处理消费者消息以防出现错误。但在这样做时,我希望避免KafkaMessageListenerContainer抛出的异常消息。相反,我想显示一条自定义消息。我尝试在ConcurrentKafkAlisterContainerFactory中添加错误处理程序,但这样做时,我的重试没有被调用。是否有人可以指导我如何显示自定义异常消息以及重试方案?下面是我的代码片段:

ConcurrentKafkalistenerContainerFactorybean配置


@Bean
ConcurrentKafkaListenerContainerFactory << ? , ? > concurrentKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory < Object, Object > kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory < Object, Object > kafkaListenerContainerFactory =
        new ConcurrentKafkaListenerContainerFactory < > ();
    configurer.configure(kafkaListenerContainerFactory, kafkaConsumerFactory);
    kafkaListenerContainerFactory.setConcurrency(1);

    // Set Error Handler
    /*kafkaListenerContainerFactory.setErrorHandler(((thrownException, data) -> {
        log.info("Retries exhausted);
    }));*/
    return kafkaListenerContainerFactory;
}

Kafka消费者

@KafkaListener(
    topics = "${spring.kafka.reprocess-topic}",
    groupId = "${spring.kafka.consumer.group-id}",
    containerFactory = "concurrentKafkaListenerContainerFactory"
)
@Retryable(
    include = RestClientException.class,
    maxAttemptsExpression = "${spring.kafka.consumer.max-attempts}",
    backoff = @Backoff(delayExpression = "${spring.kafka.consumer.backoff-delay}")
)
public void onMessage(ConsumerRecord < String, String > consumerRecord) throws Exception {
    // Consume the record
    log.info("Consumed Record from topic : {} ", consumerRecord.topic());


    // process the record
    messageHandler.handleMessage(consumerRecord.value());
}

共有2个答案

秦景福
2023-03-14

我的事件重试模板,

@Bean(name = "eventRetryTemplate")
public RetryTemplate eventRetryTemplate() {
RetryTemplate template = new RetryTemplate();

ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
policyMap.put(NonRecoverableException.class, new NeverRetryPolicy());
policyMap.put(RecoverableException.class, new AlwaysRetryPolicy());
retryPolicy.setPolicyMap(policyMap);

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMaxInterval(backoffMaxInterval);
backOffPolicy.setMultiplier(backoffMultiplier);

template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}

我的kafka侦听器使用重试模板,

@KafkaListener(
  groupId = "${kafka.consumer.group.id}",
  topics = "${kafka.consumer.topic}",
  containerFactory = "eventContainerFactory")
  public void eventListener(ConsumerRecord<String, String> 
  events,
  Acknowledgment acknowledgment) {
  eventRetryTemplate.execute(retryContext -> {
  retryContext.setAttribute(EVENT, "my-event");
  eventConsumer.consume(events, acknowledgment);
  return null;
  });
}

我的Kafka消费者财产,

private ConcurrentKafkaListenerContainerFactory<String, String> 
getConcurrentKafkaListenerContainerFactory(
  KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(Boolean.TRUE);
kafkaErrorEventHandler.setCommitRecovered(Boolean.TRUE);
factory.setErrorHandler(kafkaErrorEventHandler);
factory.setConcurrency(1); 
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(getEventConsumerFactory(kafkaProperties));
return factory;
}

kafka错误事件处理程序是我的自定义错误处理程序,它扩展了SeektocurInterrorHandler并实现了handle error方法,如下图所示。。。。。

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> 
records,
  Consumer<?, ?> consumer, MessageListenerContainer container) {

log.info("Non recoverable exception. Publishing event to Database");
super.handle(thrownException, records, consumer, container);

ConsumerRecord<String, String> consumerRecord = (ConsumerRecord<String, 
String>) records.get(0);

FailedEvent event = createFailedEvent(thrownException, consumerRecord);

failedEventService.insertFailedEvent(event);

log.info("Successfully Published eventId {} to Database...", 
event.getEventId());
}

这里,failed event service是我的自定义类,它将这些失败事件放入一个可查询的关系数据库中(我选择它作为我的DLQ)。

丁星火
2023-03-14

您不应该使用Retryable(可重试)以及SeektocurInterrorHandler(从2.5开始,现在是默认版本;因此我假设您使用的是该版本)。

相反,请使用最大尝试次数、后退和可重试的异常配置自定义的SeekToCurrentErrorHandler。

该错误消息是正常的;由容器记录;通过在seektocurinterrorhandler上设置logLevel属性,可以将日志级别从ERROR降低到INFO或DEBUG。您还可以向其中添加自定义恢复程序,以便在重试结束后记录自定义消息。

 类似资料:
  • 我使用的是关于这个链接的教程: http://www.mkyong.com/spring-security/display-custom-error-message-in-spring-security/ 在登录表单上显示一个自定义错误消息,我在开始时就得到了它。但是在为自定义失败身份验证器处理程序声明authentication-failure-handler-ref=“myautherrorh

  • 问题内容: 假设我们有: 尝试输入错误的URL时,用户会看到一个普通的“找不到404页”。如何为这种情况返回自定义页面? 有关大猩猩/木糖的更新 对于使用纯net / http软件包的用户,可以接受答案。 如果您使用大猩猩/多路复用器,则应使用以下内容: 并根据需要实施。 问题答案: 我通常这样做: 在这里,我将代码简化为仅显示自定义404,但实际上使用此设置可以做更多的事情:我使用来处理所有HT

  • 如何在wordpress忍者表单插件中显示自定义错误消息。我已经搜索了忍者表格文件,但无法到达网站。默认情况下,错误消息显示为 我希望它是 提前道谢。

  • 我正在努力裁剪javax。验证。ConstraintValidator和javax。验证。根据我的需要限制ValidatorContext。我从格式错误的请求正文收到的响应消息始终采用以下形状: <代码> 此消息也以500而不是400错误请求的形式返回。我无法获得工作到解决方案来执行以下操作: 仅包括<代码> 我有以下代码: 向上面的代码发送格式错误的有效负载将导致如下消息: 我希望能够收到以下信

  • javascript中是否有方法使运行时在错误消息中使用不同的对象表示。 典型的错误消息 如果我们可以为对象提供比更好的表示,那将很有帮助。在其他语言中,您可以通过覆盖使其打印更好的对象表示。 然而,在这种情况下,重写toString似乎没有效果。

  • 安全测试人员声称,我应该清理返回的JSON(即转义这些符号),因为这可能会给旧的浏览器带来一些问题(即在浏览器中执行此JS代码)。 但是生成错误消息的是SpringBoot框架, 我在这里没有太多的控制权。 当然,我可以将参数定义为String,并自己进行验证,但我怀疑这是否是正确的方法。我的参数定义为Integer,我希望它保持这种方式。 做这件事最简单的方法是什么?