当前位置: 首页 > 知识库问答 >
问题:

在批处理模式下使用Spring Cloud Stream发送到DLQ失败

海鸣
2023-03-14

尝试将Spring配置为在使用批处理模式时向死信队列发送错误消息。但由于dlq主题中没有任何内容。

我使用Spring Boot 2.5.3和Spring Cloud 2020.0.3。这会自动将spring cloud stream binder kafka父版本解析为3.1.3。

这是申请表。属性:

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.destination=topic4
spring.cloud.stream.bindings.input-in-0.group=batch4
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=5 

这是函数式编程模型中的应用程序和批处理侦听器:

@SpringBootApplication
public class DemoKafkaBatchErrorsApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoKafkaBatchErrorsApplication.class, args);
    }

    @Bean
    public Consumer<List<byte[]>> input() {
        return messages -> {

            for (int i = 0; i < messages.size(); i++) {

                throw new BatchListenerFailedException("Demo: failed to process = ", i);
            }
        };
    }

    @Bean
    public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate<String, byte[]> template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 10));
    }
}

发送到主题:

./kafka-console-producer.sh --broker-list broker:9092 --topic topic4 < input.json

从DLQ阅读:

./kafka-console-consumer.sh --bootstrap-server broker:9092 --topic topic4_ERR --from-beginning --max-messages 100

因此,运行此应用程序后,我在dlq主题中什么都没有,但在控制台中有许多消息,如:

Caused by: org.springframework.kafka.listener.BatchListenerFailedException: Demo: failed to process =  @-0
    at com.example.demokafkabatcherrors.DemoKafkaBatchErrorsApplication.lambda$input$0(DemoKafkaBatchErrorsApplication.java:29) ~[classes/:na]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.2.jar:5.5.2]
    ... 27 common frames omitted

我做错了什么?

UPD:根据加里的回答,我做了这些改变:

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
        return ((container, destinationName, group) -> container.setBatchErrorHandler(handler));
    }

    @Bean
    public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaOperations,
                (cr, e) -> new TopicPartition(cr.topic() + "_ERR", 0));
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 3));
    }

一切都很有魅力

共有1个答案

路雅懿
2023-03-14

使用spring cloud stream时,容器不是由Boot的容器工厂创建的,而是由binder创建的;错误处理程序Bean不会自动连接。

您必须配置一个ListenerContainerCustomizer@Bean

这里的示例:使用Spring Cloud Stream Kafka 3.0.3时,我可以应用优雅的关闭吗?释放?

 类似资料:
  • 在前面的章节中,你交互式地使用mysql输入查询并且查看结果。你也可以以批模式运行mysql。为了做到这些,把你想要运行的命令放在一个文件中,然后告诉mysql从文件读取它的输入: shell> mysql < batch-file 如果在Windows下运行mysql,并且文件中有一些可以造成问题的特殊字符,可以这样操作: C:\> mysql -e "source batch-file" 如果

  • 我已经使用Spring Cloud Stream和Spring Cloud函数创建了一个Kafka消费者,用于以批处理模式从Kafka主题消费消息。现在,我想将错误批处理发送到死信队列以进一步调试错误。 我正在使用Spring重试处理消费者方法中的重试。但对于不可重试的异常,我希望将整个批次发送到DLQ。 这是我的消费者的样子: 错误处理配置如下所示: 使用DeadRec总计PublishinRe

  • 我正在尝试使用启用批处理模式的spring cloud stream实现DLQ 但有一些疑问: > 如何使用属性配置键/值序列化程序-我的消息是String类型,但KafkaOperations使用的是ByteArraySerializer 在批处理中,有多个消息,但如果第一条消息失败,它会转到DLQ,但看不到下一条消息的处理。 要求-如果批处理失败,我只需要将该消息发送到DLQ,并且应该再次处理

  • 11.1 日志项处理和失败 一个常见的用例是需要在一个步骤中特殊处理错误,chunk-oriented步骤(从创建工厂bean的这个步骤)允许用户实现一个简单的ItemReadListener用例,用来监听读入错误,和一个ItemWriteListener,用来监听写出错误.下面的代码片段说明一个监听器监听失败日志的读写: >public class ItemFailureLoggerListen

  • 一些批处理任务可以使用spring batch现成的组件完全的组装.例如ItemReader和ItemWriter实现可配置覆盖范围广泛的场景,然而,对于大多数情况下,必须编写自定义代码。应用程序开发人员的主要API人口点是Tasklet,ItemReader ItemWriter和各种各样的监听器接口.最简单的批处理任务能够使用Spring BatchItemReader现成的输出,但通常情况下

  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087