我目前正在使用Spring Kafka来使用topic中的消息以及Spring的@Retry。因此,基本上,我正在尝试处理消费者消息以防出现错误。但在这样做时,我希望避免KafkaMessageListenerContainer抛出的异常消息。相反,我想显示一条自定义消息。我尝试在ConcurrentKafkAlisterContainerFactory中添加错误处理程序,但这样做时,我的重试没有被调用。是否有人可以指导我如何显示自定义异常消息以及重试方案?下面是我的代码片段:
ConcurrentKafkalistenerContainerFactorybean配置
@Bean
ConcurrentKafkaListenerContainerFactory << ? , ? > concurrentKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory < Object, Object > kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory < Object, Object > kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory < > ();
configurer.configure(kafkaListenerContainerFactory, kafkaConsumerFactory);
kafkaListenerContainerFactory.setConcurrency(1);
// Set Error Handler
/*kafkaListenerContainerFactory.setErrorHandler(((thrownException, data) -> {
log.info("Retries exhausted);
}));*/
return kafkaListenerContainerFactory;
}
Kafka消费者
@KafkaListener(
topics = "${spring.kafka.reprocess-topic}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "concurrentKafkaListenerContainerFactory"
)
@Retryable(
include = RestClientException.class,
maxAttemptsExpression = "${spring.kafka.consumer.max-attempts}",
backoff = @Backoff(delayExpression = "${spring.kafka.consumer.backoff-delay}")
)
public void onMessage(ConsumerRecord < String, String > consumerRecord) throws Exception {
// Consume the record
log.info("Consumed Record from topic : {} ", consumerRecord.topic());
// process the record
messageHandler.handleMessage(consumerRecord.value());
}
我的事件重试模板,
@Bean(name = "eventRetryTemplate")
public RetryTemplate eventRetryTemplate() {
RetryTemplate template = new RetryTemplate();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
policyMap.put(NonRecoverableException.class, new NeverRetryPolicy());
policyMap.put(RecoverableException.class, new AlwaysRetryPolicy());
retryPolicy.setPolicyMap(policyMap);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMaxInterval(backoffMaxInterval);
backOffPolicy.setMultiplier(backoffMultiplier);
template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}
我的kafka侦听器使用重试模板,
@KafkaListener(
groupId = "${kafka.consumer.group.id}",
topics = "${kafka.consumer.topic}",
containerFactory = "eventContainerFactory")
public void eventListener(ConsumerRecord<String, String>
events,
Acknowledgment acknowledgment) {
eventRetryTemplate.execute(retryContext -> {
retryContext.setAttribute(EVENT, "my-event");
eventConsumer.consume(events, acknowledgment);
return null;
});
}
我的Kafka消费者财产,
private ConcurrentKafkaListenerContainerFactory<String, String>
getConcurrentKafkaListenerContainerFactory(
KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(Boolean.TRUE);
kafkaErrorEventHandler.setCommitRecovered(Boolean.TRUE);
factory.setErrorHandler(kafkaErrorEventHandler);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(getEventConsumerFactory(kafkaProperties));
return factory;
}
kafka错误事件处理程序是我的自定义错误处理程序,它扩展了SeektocurInterrorHandler并实现了handle error方法,如下图所示。。。。。
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>>
records,
Consumer<?, ?> consumer, MessageListenerContainer container) {
log.info("Non recoverable exception. Publishing event to Database");
super.handle(thrownException, records, consumer, container);
ConsumerRecord<String, String> consumerRecord = (ConsumerRecord<String,
String>) records.get(0);
FailedEvent event = createFailedEvent(thrownException, consumerRecord);
failedEventService.insertFailedEvent(event);
log.info("Successfully Published eventId {} to Database...",
event.getEventId());
}
这里,failed event service是我的自定义类,它将这些失败事件放入一个可查询的关系数据库中(我选择它作为我的DLQ)。
您不应该使用Retryable(可重试)以及SeektocurInterrorHandler(从2.5开始,现在是默认版本;因此我假设您使用的是该版本)。
相反,请使用最大尝试次数、后退和可重试的异常配置自定义的SeekToCurrentErrorHandler。
该错误消息是正常的;由容器记录;通过在seektocurinterrorhandler上设置logLevel属性,可以将日志级别从ERROR降低到INFO或DEBUG。您还可以向其中添加自定义恢复程序,以便在重试结束后记录自定义消息。
我使用的是关于这个链接的教程: http://www.mkyong.com/spring-security/display-custom-error-message-in-spring-security/ 在登录表单上显示一个自定义错误消息,我在开始时就得到了它。但是在为自定义失败身份验证器处理程序声明authentication-failure-handler-ref=“myautherrorh
问题内容: 假设我们有: 尝试输入错误的URL时,用户会看到一个普通的“找不到404页”。如何为这种情况返回自定义页面? 有关大猩猩/木糖的更新 对于使用纯net / http软件包的用户,可以接受答案。 如果您使用大猩猩/多路复用器,则应使用以下内容: 并根据需要实施。 问题答案: 我通常这样做: 在这里,我将代码简化为仅显示自定义404,但实际上使用此设置可以做更多的事情:我使用来处理所有HT
如何在wordpress忍者表单插件中显示自定义错误消息。我已经搜索了忍者表格文件,但无法到达网站。默认情况下,错误消息显示为 我希望它是 提前道谢。
我正在努力裁剪javax。验证。ConstraintValidator和javax。验证。根据我的需要限制ValidatorContext。我从格式错误的请求正文收到的响应消息始终采用以下形状: <代码> 此消息也以500而不是400错误请求的形式返回。我无法获得工作到解决方案来执行以下操作: 仅包括<代码> 我有以下代码: 向上面的代码发送格式错误的有效负载将导致如下消息: 我希望能够收到以下信
javascript中是否有方法使运行时在错误消息中使用不同的对象表示。 典型的错误消息 如果我们可以为对象提供比更好的表示,那将很有帮助。在其他语言中,您可以通过覆盖使其打印更好的对象表示。 然而,在这种情况下,重写toString似乎没有效果。
安全测试人员声称,我应该清理返回的JSON(即转义这些符号),因为这可能会给旧的浏览器带来一些问题(即在浏览器中执行此JS代码)。 但是生成错误消息的是SpringBoot框架, 我在这里没有太多的控制权。 当然,我可以将参数定义为String,并自己进行验证,但我怀疑这是否是正确的方法。我的参数定义为Integer,我希望它保持这种方式。 做这件事最简单的方法是什么?