当前位置: 首页 > 知识库问答 >
问题:

Spring集成Kafka制作Kafka时如何处理错误

何承
2023-03-14

当使用int-kafka: out站通道适配器生成到kafka时,似乎没有可用的错误通道。在这种情况下,如何处理reties次后无法向kafka生成消息

任何可能导致kafka失败的错误。(以下代码只是来自Internet的代码片段,只是想知道如何向其添加错误句柄)

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    topic="test">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProps">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="XXXXXX:6667"
                   key-class-type="java.lang.String"
                   value-class-type="java.lang.String"
                   topic="rating"
                   value-serializer="kafkaSerializer"
                   key-serializer="kafkaSerializer"
                   compression-type="none"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>


<util:properties id="producerProps">
    <prop key="queue.buffering.max.ms">500</prop>
    <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
    <prop key="queue.buffering.max.messages">10000</prop>
    <prop key="retry.backoff.ms">100</prop>
    <prop key="message.send.max.retries">2</prop>
    <prop key="send.buffer.bytes">5242880</prop>
    <prop key="socket.request.max.bytes">104857600</prop>
    <prop key="socket.receive.buffer.bytes">1048576</prop>
    <prop key="socket.send.buffer.bytes">1048576</prop>
    <prop key="request.required.acks">1</prop>
</util:properties>

共有1个答案

乔望
2023-03-14

请按照int jpa中异常处理问题的答案:更新出站网关。

既然你有

 类似资料:
  • 我如何处理在Spring整合中未能向Kafka传达的信息? 我在“int kafka:outbound channel adapter”中没有看到“error channel”是一个选项,我想知道应该在哪里添加错误通道信息,以便我的ErrorHandler可以获得“failed to kafka”类型的错误。(包括所有类型的故障、配置、网络等) 此外,inputToKafka是排队通道,我应该在哪

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。

  • 我的pom.xml是: 我错过了什么?

  • 我有一个rest postendpoint,它使用Spring Cloud Stream Kafka绑定器来使用数据和写到Kafka。现在我们没有任何错误处理到位。但是我们希望通过在没有将数据写入Kafka时添加一个额外的检查来使这个endpoint容错。当数据没有写入Kafka时,我们打算发送一个异常信息对象。我试图用这种方法使用全局错误来实现这一点 我的怀疑有两个方面: 当我们写入Kafka的

  • 大家好,我们使用的是Spring kafka 1.3.3,我们的应用程序是消耗-进程-发布管道。 如果在生产阶段流水线出现任何故障,我们如何处理重试并寻求返回。例如:应用程序正在消耗消息,处理它们并以异步方式发布到另一个主题中。但如果在发布中有任何错误

  • 我有以下Kafka配置类: 以及以下KafkaListener: 我想使用<code>SeekToCurrentErrorHandler</code>进行错误处理,我想使用类似于这里的特定功能,但目前我正在使用<code>springBootVersion=2.0.4。你能帮我设置依赖项和配置以处理Kafka消费者中的错误吗? 问候!