我已经使用Spring Cloud Stream和Spring Cloud函数创建了一个Kafka消费者,用于以批处理模式从Kafka主题消费消息。现在,我想将错误批处理发送到死信队列以进一步调试错误。
我正在使用Spring重试处理消费者方法中的重试。但对于不可重试的异常,我希望将整个批次发送到DLQ。
这是我的消费者的样子:
@Bean
public Consumer<List<GenericRecord>> consume() {
return (message) -> {
processMessage(message);
}
}
错误处理配置如下所示:
@Autowired
private DefaultErrorHandler errorHandler;
ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
container.setCommonErrorHandler(errorHandler);
}
}
使用DeadRec总计PublishinRecoverer启用错误处理程序以将失败的消息发送到DLQ:
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(cr, e) -> new TopicPartition("error.topic.name", 0)),
new FixedBackOff(0, 0));
}
但这并没有向error.topic发送任何消息,从错误日志中我可以看到它正在尝试连接到localhost:9092,而不是我在spring.cloud.stream.kafka.binder.brokers
中提到的代理。
如何配置 DLQ 提供程序以从应用程序属性
读取 Kafka 元数据?
还有没有办法配置供应商
函数来创建DLQ提供程序?
您可能正在使用 Boot 自动配置的 KafkaTemplate
。
请使用< code > spring . Kafka . bootstrap-servers ,如果没有< code > spring . cloud . stream . Kafka . binder . brokers ,绑定器将使用它;这样,绑定器和模板都将连接到同一个代理。
您必须抛出BatchListnerFailedExc的
来指示批处理中的哪一条记录失败。
尝试将Spring配置为在使用批处理模式时向死信队列发送错误消息。但由于dlq主题中没有任何内容。 我使用Spring Boot 2.5.3和Spring Cloud 2020.0.3。这会自动将spring cloud stream binder kafka父版本解析为3.1.3。 这是申请表。属性: 这是函数式编程模型中的应用程序和批处理侦听器: 发送到主题: 从DLQ阅读: 因此,运行此应用
我正在尝试用rabbit活页夹配置一个Spring-Cloud-Stream应用程序 下面是我的配置: 我的消费者java代码: 当没有错误发生时,所有工作正常。但当我模拟一个异常时,我得到了以下异常: 当兔子绑定器尝试将消息发送回 DLQ 时,会引发此错误。 事实上,错误消息有效负载包含一个 函数中使用< code>StreamBridge发送消息时,我也遇到了同样的问题。如果发送功能失败,我不
我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。 现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。 在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。 在当前版本中,我们使用了@enablebinding、@streamlistener
我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0
如何在批处理模式的情况下处理反序列化异常? 我正在使用Spring boot-2.3.8版本的Spring kafka。 尝试过此选项: 但它抛出了一个异常:由java引起。lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org。springframework。Kafka。听众。请参阅OcuCurrentBatchErrorHandler 以
我有一个ASP.NET Core1.0Web API应用程序,并试图弄清楚如果我的控制器调用的函数出错,如何将异常消息传递给客户端。 我确实看到了一些使用的文档,但是为了使用它,我必须安装compat shim。在Core1.0中有没有一种新的方法来做这些事情? 这是我一直在尝试的垫片,但它不起作用: 当抛出时,我查看客户端,在内容中找不到我正在发送的消息。