我的Spring Boot应用程序(消费者)
处理来自Apache Kafka
的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题:
如何重试失败消息?我知道一些方法,但我不确定它们是否正确。
1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。
2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这看起来很难。
3)其他东西(你的变体)
您可以在您的消费者配置中进行以下更改:
>
启用。汽车提交=错误
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(retryInterval);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount));
return retryTemplate;
}
在您的kafkaListenerContainerFactory
:
setretryTemplate(retryTemplate);
factory.getContainerProperties().setAckOnError(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
现在,在代码中,只要在异常发生时在使用者中抛出异常即可。这不会在发生异常时更新偏移量。它将在retryInterval
时间后重试,最长时间为maxRetryCount
。
如果您想忽略某些类型的异常,而不是重试,请创建一个异常映射,如下图所示,并将其传递到SimpleRetryPolicy()
中,如下图所示:
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);
有关更多详细信息,请访问此链接:Kafka错误处理
使用SeekToCurInterrorHandler。它将重新定位偏移量以重播消息(默认情况下为10次,但可配置)。
重试用尽后,它调用可以执行某些操作的“恢复者”,例如DeadLetterPublishingRecoverer。
如果您希望至少保证一次,一般模式如下:
>
enable.auto.commit
设置为false)对于每条消息:
重复
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(
基本上,我的用例是在HttpOutboundGateway请求中出现401时重试http请求。该请求来自jms代理,进入集成流。 标头使用适当的http标头进行了丰富,然后我有一个建议,使用默认的简单策略重试请求,RequestHandlerAdvice方法的问题是,它将HandlerRecovery流中的异常消息默认为非HttpException类(MessageException),因此我无法
有人能帮我弄清楚这件事吗。 谢了!
我希望使用动态接受主题作为查询参数的成功/失败响应来响应RESTendpoint。在带有小型反应式消息传递的Quakus中,代码看起来就像下面用OutgoingKafkaRecordMetadata包装有效负载一样 即https://myendpoint/PublishToKafka?主题=myDynamicTopic 从Quarkus doco“如果endpoint没有返回CompletionS
我对Kafka很陌生,对它有一些疑问。我已经配置了一个kafka消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。f、 e、。 我想配置不同的kafka监听器来消费不同类型的事件。我认为有两种方法可以做到这一点,比如使用字符串(json)格式的事件,转换成事件对象,在不同类型之间切换,执行业务逻辑,或者配置不同的kafka监听器工厂 因此,第一种方法不是 ,对于第二种方法,我需要创建许多