当前位置: 首页 > 知识库问答 >
问题:

如何配置DeadLetterPublisherRecoverer以在Spring Cloud Stream批处理模式下向DLQ发送错误消息

孟振
2023-03-14

我已经使用Spring Cloud Stream和Spring Cloud函数创建了一个Kafka消费者,用于以批处理模式从Kafka主题消费消息。现在,我想将错误批处理发送到死信队列以进一步调试错误。

我正在使用Spring重试处理消费者方法中的重试。但对于不可重试的异常,我希望将整个批次发送到DLQ。

这是我的消费者的样子:

@Bean
public Consumer<List<GenericRecord>> consume() {
    return (message) -> {
        processMessage(message);  
    }
}

错误处理配置如下所示:

@Autowired
private DefaultErrorHandler errorHandler;

ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
      @Override
      public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
        container.setCommonErrorHandler(errorHandler);
      }
}

使用DeadRec总计PublishinRecoverer启用错误处理程序以将失败的消息发送到DLQ:

@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
        (cr, e) -> new TopicPartition("error.topic.name", 0)),
        new FixedBackOff(0, 0));
}

但这并没有向error.topic发送任何消息,从错误日志中我可以看到它正在尝试连接到localhost:9092,而不是我在spring.cloud.stream.kafka.binder.brokers中提到的代理。

如何配置 DLQ 提供程序以从应用程序属性读取 Kafka 元数据

还有没有办法配置供应商函数来创建DLQ提供程序?

共有1个答案

萧秋月
2023-03-14

您可能正在使用 Boot 自动配置的 KafkaTemplate

请使用< code > spring . Kafka . bootstrap-servers ,如果没有< code > spring . cloud . stream . Kafka . binder . brokers ,绑定器将使用它;这样,绑定器和模板都将连接到同一个代理。

您必须抛出BatchListnerFailedExc的来指示批处理中的哪一条记录失败。

 类似资料:
  • 尝试将Spring配置为在使用批处理模式时向死信队列发送错误消息。但由于dlq主题中没有任何内容。 我使用Spring Boot 2.5.3和Spring Cloud 2020.0.3。这会自动将spring cloud stream binder kafka父版本解析为3.1.3。 这是申请表。属性: 这是函数式编程模型中的应用程序和批处理侦听器: 发送到主题: 从DLQ阅读: 因此,运行此应用

  • 我正在尝试用rabbit活页夹配置一个Spring-Cloud-Stream应用程序 下面是我的配置: 我的消费者java代码: 当没有错误发生时,所有工作正常。但当我模拟一个异常时,我得到了以下异常: 当兔子绑定器尝试将消息发送回 DLQ 时,会引发此错误。 事实上,错误消息有效负载包含一个 函数中使用< code>StreamBridge发送消息时,我也遇到了同样的问题。如果发送功能失败,我不

  • 我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。 现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。 在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。 在当前版本中,我们使用了@enablebinding、@streamlistener

  • 我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0

  • 如何在批处理模式的情况下处理反序列化异常? 我正在使用Spring boot-2.3.8版本的Spring kafka。 尝试过此选项: 但它抛出了一个异常:由java引起。lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org。springframework。Kafka。听众。请参阅OcuCurrentBatchErrorHandler 以

  • 我有一个ASP.NET Core1.0Web API应用程序,并试图弄清楚如果我的控制器调用的函数出错,如何将异常消息传递给客户端。 我确实看到了一些使用的文档,但是为了使用它,我必须安装compat shim。在Core1.0中有没有一种新的方法来做这些事情? 这是我一直在尝试的垫片,但它不起作用: 当抛出时,我查看客户端,在内容中找不到我正在发送的消息。