当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud SteamKafka误差通道

欧阳安阳
2023-03-14

我正在尝试建立一个绑定,将Kafka消息从Spring Integration errorChannel转发到一个自定义通道(用于集中错误处理)。

错误消息被发送到配置的通道,但它们是以genericmessage的形式到达的,负载为byte[](包含异常详细信息和失败消息)。

我的配置:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          accountOut.producer:
            sync: true
        binder:
          autoCreateTopics: false
          headers:
               - spanId
               - spanTraceId
               - spanSampled
               - spanParentSpanId
               - spanName
               - spanFlags
               - eventType
               - Authorization
      bindings:
        error:
          destination: test-error
        accountOut:
          producer.partitionKeyExpression: payload.key
          content-type: application/json
          destination: account
  kafka:
    producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
    consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer

阅读文档时,我希望消息以errormessage的形式到达。我有什么办法可以做到这一点吗?还是将有效负载配置为作为对象到达?

我正在使用的版本:

  • Spring Boot 1.5.8
  • Spring Cloud Edgware
  • Kafka11客户
  • Spring Integration core 4.3.12
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    return handler;
}

但是是否可以在属性yaml中配置这个流,而不是在代码中配置这个流呢?这就是所有其他Kafka配置的地方,所以在代码中配置Kafka模板并不理想。

另一个选择是显式侦听ErrorMessage,并在代码中发送到kafka输出通道:

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    outputChannel.kieranError().send(...)
}

共有1个答案

施弘壮
2023-03-14

你到底是在哪里消费这样的信息呢?

enabledlq对于使用者为true时,您描述的消息听起来像是发送到DLQ主题的消息;你没有显示消费者配置,所以我很难猜。

发送到目标特定错误通道(并桥接到全局errorchannelerrormessage可以使用

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    ...
}
error:
  destination:
 类似资料:
  • 除了这一部分,我大部分都懂了: 我不知道这行是干什么的:

  • 我正在使用Kafka生产者发布消息到一些其他Kafka的主题,它的工作相当好。下面的示例模板: 上述语句是否支持kafka消息驱动入站通道适配器中所支持的errorchannel? 每当我传出的kafka服务器宕机,我无法发布它时,我需要它来审计错误计数。

  • 我正在用Spring构建WebApi,用ReactJS构建客户机。我试图执行一个POST请求,用OAuth2对WebApi进行身份验证,但我不断得到 WebSecurityConfigurerAdapter: 我的要求是:

  • 完全错误: 原因:org.springframework.beans.factory.beanCreationException:无法自动执行字段:private com.flex.eventmanagement.handler.helper.messageformanagement.handler.helper.notificationpreprocessor.messageformathelp

  • 我在下面得到了三个多小时的例外。有人知道怎么摆脱它吗? javax。坚持不懈PersistenceException:Exception[EclipseLink-4002](Eclipse持久性服务-2.3.2.v20111125-r10461):org。日食坚持不懈例外。DatabaseException内部异常:java。sql。SQLException:暂时性错误,请重试。错误代码:1在组织

  • 误差线图通常表示数据的可变范围、误差范围。 图4-17 Highcharts 误差线图 误差线图相关的 API: 误差线数据列配置 误差线数据列 一、误差线相关配置 通过指定数据列的类型为 errorbar 即可创建误差线数据列。由于误差线是主数据列的附属数据列,是对主数据列进行补充说明,所有需要一个主数据列与其进行关联。 1、关联数据列 通过 linkedTo 可以指定误差线关联的数据列,lin