当前位置: 首页 > 知识库问答 >
问题:

使用启用批处理模式的Spring Cloud Stream在Kafka中实现DLQ

苏凯
2023-03-14

我正在尝试使用启用批处理模式的spring cloud stream实现DLQ

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
        return ((container, destinationName, group) -> {
              if(dlqEnabledTopic.contains(destinationName))
                                    container.setBatchErrorHandler(handler);});
    }

    @Bean
    public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
        CustomDeadLetterPublishingRecoverer recoverer = new CustomDeadLetterPublishingRecoverer(kafkaOperations,
                (cr, e) -> new TopicPartition(cr.topic()+"_dlq", cr.partition()));
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(1000, 1));
    }

但有一些疑问:

>

  • 如何使用属性配置键/值序列化程序-我的消息是String类型,但KafkaOperations使用的是ByteArraySerializer

    在批处理中,有多个消息,但如果第一条消息失败,它会转到DLQ,但看不到下一条消息的处理。

    要求-如果批处理失败,我只需要将该消息发送到DLQ,并且应该再次处理消息的其余部分。

  • 共有1个答案

    易奇希
    2023-03-14

    >

  • spring.kafka.producer.*属性-但是,DLT发布应该使用与主流应用相同的序列化程序。ByteArraySeriezer通常是正确的。

    恢复的批处理错误处理程序将对未处理的记录执行查找,它们将被返回。调试日志记录应该可以帮助您找出问题所在。如果您无法弄清楚,请提供一个显示您正在看到的行为的MCRE。

    不活页夹不支持批量模式的DLQ;配置错误处理程序是正确的方法。

  •  类似资料:
    • 在前面的章节中,你交互式地使用mysql输入查询并且查看结果。你也可以以批模式运行mysql。为了做到这些,把你想要运行的命令放在一个文件中,然后告诉mysql从文件读取它的输入: shell> mysql < batch-file 如果在Windows下运行mysql,并且文件中有一些可以造成问题的特殊字符,可以这样操作: C:\> mysql -e "source batch-file" 如果

    • 一些批处理任务可以使用spring batch现成的组件完全的组装.例如ItemReader和ItemWriter实现可配置覆盖范围广泛的场景,然而,对于大多数情况下,必须编写自定义代码。应用程序开发人员的主要API人口点是Tasklet,ItemReader ItemWriter和各种各样的监听器接口.最简单的批处理任务能够使用Spring BatchItemReader现成的输出,但通常情况下

    • 几个月前,我用Spring Batch制作了一个项目。 该项目工作正常,包括JobExecution决策器的实现 这使fini仅与Spring Batch配合使用。 现在我必须将其用于Spring Boot批处理。在决策步骤之前,所有流程都运行良好。其中,我返回了良好的FlowExecutionStatus,但我不知道为什么,作业以“失败”状态完成。 有人知道为什么不工作了? 谢谢

    • 现在我正在用Apache Kafka做一些测试。在Kafka生产者的配置中,参数batch.size和linger.ms控制批处理策略。是否可以在生产的同时动态地制作这些参数?例如。如果数据摄取率上升很快,我们可能希望增加batch.size以每批积累更多的消息。我没有找到任何动态批处理与Kafka生产者的例子。有没有可能实施?

    • 在前面的章节,我们使用mysql交互模式输入语句和查看结果。我们也可以以批处理的模式来运行mysql。要达到这种目的,需要把我们想运行的语句放在一个文件里,然后告诉mysql从文件读取它的输入。 shell> mysql < batch-file 如果你是在Windows下运行mysql,文件里的一些特定字符会引起一些问题,你要这样做: C:\> mysql -e "source batch-f

    • 考虑一个阶跃豆: 要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。 在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行