当前位置: 首页 > 知识库问答 >
问题:

Spring集成kafka出站适配器错误处理

裴星洲
2023-03-14

我如何处理在Spring整合中未能向Kafka传达的信息?

我在“int kafka:outbound channel adapter”中没有看到“error channel”是一个选项,我想知道应该在哪里添加错误通道信息,以便我的ErrorHandler可以获得“failed to kafka”类型的错误。(包括所有类型的故障、配置、网络等)

此外,inputToKafka是排队通道,我应该在哪里添加错误通道来处理潜在的队列完全错误?

<int:gateway id="myGateway" 
            service-interface="someGateway" 
            default-request-channel="transformChannel" 
            error-channel="errorChannel"  
            default-reply-channel="replyChannel" 
            async-executor="MyThreadPoolTaskExecutor"/>

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
    <bean class="Transformer"/>  
</int:transformer>

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2">
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
                    task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    ....
</bean>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
    <bean class="ErrorHandler"/>
</int:service-activator>

编辑

<property name="producerListener">
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>

共有1个答案

郜光明
2023-03-14

下游流上的任何错误都将被发送到网关上的错误通道。但是,由于Kafka在默认情况下是异步的,因此不会出现任何错误。您可以在出站适配器上设置sync=true,然后如果出现问题,将引发异常。

但请记住,这会慢得多。

通过向KafkaTemplate添加ProducerListener可以获得异步异常。

 类似资料:
  • 基础知识: 使用带集成的Spring 4.1.1,引导和1.0.0的DSL。 多个入站SFTP适配器在不同的时间表上从不同的供应商获取文件。 每个集成流在文件下载后将标头附加到消息中,以标识供应商源。 使用MessagePublishingErrorHandler处理异常。 标准消息流在消息处理成功或消息未能完成时通知外部监控解决方案。使用消息头来识别哪个流失败。 在我们收到消息后,成功和错误流都

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 我正在开发一个Spring集成应用程序,我有一个地图列表,我需要将其插入到表格中。 我使用了jdbc: Outsport-网关或适配器将记录插入到表中。 但是如何使用jdbc:出站网关从我的地图列表中插入所有记录。

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我是Spring整合的新手。 我有一个流,根据某些条件,我需要在其上执行超文本传输协议或tcp调用。我关注的问题与超文本传输协议调用有关。调用的其余endpoint需要一个访问令牌作为身份验证的标头参数,该参数由具有2个方法和的Spring服务提供。我想仅在过期时调用方法刷新。 我想做的是在执行对rest api的调用时添加以下逻辑: 如果令牌过期,restendpoint返回401,我希望在流中

  • 我正在尝试构建一个集成解决方案,其中 我的outboundgateway定义为 请求工厂在哪里 快乐之路运行良好,我面临的问题是不太快乐的道路。 当Api调用返回错误响应时。转换(exetrnaldto到dto)失败,客户端获得500 我想把error resposne json也翻译成我的json 我如何处理错误情况 我的问题是: 如何处理错误。 在错误条件下如何停止流不转换 如何将状态代码从出