当前位置: 首页 > 知识库问答 >
问题:

Spring云流中的批处理模式和自定义错误处理

金健
2023-03-14

在我开始使用Spring Cloud Stream之前,我使用的是Spring-Kafka及其对批量消费和自定义错误处理的支持。请注意这段代码的最后两行:

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getConcurrency());
    factory.setConsumerFactory(consumerFactory);
    factory.setMessageConverter(avroMessageConverter);
    factory.getContainerProperties().setPollTimeout(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getPollTimeout());
    factory.getContainerProperties().setPauseEnabled(true);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    factory.getContainerProperties().setErrorHandler(dlqAGTaskErrorHandler);

然而,对于Spring Cloud Stream,我找不到如何配置它。我只能找到这些配置属性:

Spring、响铃、水流、kafka.bindings.inputconsumer。autoCommitOffset,启用Dlq

因此,在Spring Cloud Stream中注册一个自定义错误处理程序并将AckMode设置为BATCH是否可能?

感谢您的支持。

共有1个答案

景温书
2023-03-14

目前,我们还不支持Spring Cloud Stream级别的这些选项。以下问题应在实施后提供等效选项(可能尽快Chelsea.RC1):

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/70

https://github.com/spring-cloud/spring-cloud-stream/issues/538

 类似资料:
  • 我正在使用Spring Cloud Stream和Kafka Binder批量消费来自一个Kafka主题的消息。我正在尝试实现一个错误处理机制。根据我的理解,我不能在批处理模式下使用Spring Cloud Stream的< code>enableDLQ属性。 我找到了和,以重试并从spring-kafka文档发送失败消息。但我无法理解如何按照功能编程标准将记录发送到自定义DLQ主题。我看到的所有

  • 所有的错误最终都会被 Tango.ErrHandler 进行处理。 你可以自定义你的错误处理方式来替代默认的。例如: var ( prefix = "<html><head>tango</head><body><div>" suffix = fmt.Sprintf("</div><div>version: %s</div></body></html>", tango.Version

  • 404和500错误客户端和服务端都会通过error.js组件处理。如果你想改写它,则新建_error.js在文件夹中: import React from 'react' export default class Error extends React.Component { static getInitialProps({ res, err }) { const statusCod

  • 修改日志 2019-07-30 优化了 logger.go,日志新增了返回数据。 调用 alarm.WeChat("错误信息") alarm.Email("错误信息") alarm.Sms("错误信息") alarm.Panic("错误信息") 运行 下载源码后,请先执行 dep ensure 下载依赖包! 效果 {"time":"2019-07-23 22:55:27","alarm":

  • 修改日志 2019-07-30 优化了 logger.go,日志新增了返回数据。 调用 alarm.WeChat("错误信息") alarm.Email("错误信息") alarm.Sms("错误信息") alarm.Panic("错误信息") 运行 下载源码后,请先执行 dep ensure 下载依赖包! 效果 {"time":"2019-07-23 22:55:27","alarm":

  • 概述 开始今天的文章,为什么要自定义错误处理?默认的错误处理方式是什么? 那好,咱们就先说下默认的错误处理。 默认的错误处理是 errors.New("错误信息"),这个信息通过 error 类型的返回值进行返回。 举个简单的例子: func hello(name string) (str string, err error) { if name == "" { err =