当前位置: 首页 > 知识库问答 >
问题:

spring-Could Steam消息转换异常

蓬琦
2023-03-14

在将我们的一个服务升级到spring-cloud-stream2.0.0.rc3时,当试图使用一个使用较旧版本spring-cloud-stream-ditmars的服务生成的消息时,我们遇到了一个异常。发行版:

错误31241---[container-4-C-1]O.S.Integration.Handler.LoggingHandler:org.SpringFramework.Messaging.Converter.MessageConversionException:无法将GenericMessage[Payload=Byte[371],Headers={kafka_offset=1,kafka_Consumer=org.Apache.kafka_Clients.Consumer.kafka_Clients.Consumer@62029D0D,kafka_TimeStampType=Create_Time,21641760698,timestamp=1521641772477}]位于org.springframework.messaging.handler.annotation.support.payloadargumentresolver.resolveargument(payloadargumentresolver.java:144)位于org.springframework.messaging.handler.invocation.handler.methodargumentargumentresolver.java:144)位于org.springframework.messaging.handler.invocation.resolveargument(Tream.binding.dispatchingStreamListenerMessageHandler.Handler.RequestMessage(DispatchingStreamListenerMessageHandler.java:87)位于org.SpringFramework.Integration.Handler.AbstractReplyProducingMessageHandler.HandleMessageInternal(AbstractReplyProducingMessageHandler.109)位于org.SpringFramework.Integration.Handler.AbstractMessageHandler.AbstractMessage.157)位于L.send(AbstractMessageChannel.java:407)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.160)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(n消息(KafkaMessageDrivenChannelAdapter.java:364)位于org.springframework.kafka.listener.kafkaMessageListenerContainer$listenerConsumer.doInvokeRecistener(KafkaMessageListenerContainer.kafkaMessageListener.java:1001)位于org.springframework.kafka.listener.kafkaMessageListener.java:1001)位于

原因似乎是与消息一起发送的ContentType头是text/plain,尽管它应该是application/json
生产者配置

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: user-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor
        bindings:
          user-output:
            destination: user
            producer:
              partitionCount: 5

null

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true            
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
            headerMode: embeddedHeaders
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
            headerMode: embeddedHeaders
        bindings:          
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true          

消费者@StreamListener:

    @StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'")
    public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId",
            required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable {
        logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId));
        handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);
    }

共有1个答案

曾实
2023-03-14

这是RC3中的一个bug;最近固定在主人身上;它将在本月底的GA版本中发布。同时,您能尝试使用2.0.0.build-Snapshot吗?

我能够重现这个问题,并用快照为我修复了它。

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

编辑

关于完整性:

Ditmars生产者

@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {

    public static void main(String[] args) {
        SpringApplication.run(So49409104Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            Foo foo = new Foo();
            foo.setBar("bar");
            output.send(new GenericMessage<>(foo));
        };
    }


    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

而且

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: so49409104a
          content-type: application/json
          producer:
            header-mode: embeddedHeaders

埃尔姆赫斯特消费者:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {

    public static void main(String[] args) {
        SpringApplication.run(So494091041Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(Foo foo) {
        System.out.println(foo);
    }

    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

而且

spring:
  cloud:
    stream:
      bindings:
        input:
          group: so49409104
          destination: so49409104a
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json

结果:

Foo [bar=bar]

header-mode是必需的,因为2.0中的默认值是native,现在Kafka支持本机标头。

 类似资料:
  • 我在使用spring-jms模块转换来自RabbitMQ的消息时遇到了一些问题。以前,我使用Rest APIendpoint发送消息,该endpoint将消息发送到RabbitMQ队列,并使用@JMSListener方法处理它。 在内部,这种行为添加了一个字段来确定Java类型,由Spring库管理。但是,现在我想避免Rest API调用,因为它不是必需的,而且我可以直接将消息发送到RabbitM

  • Spring JMS中的类具有setter方法,它允许提供我们想要的任何转换器。 对于带注释的消息侦听器,这很有意义,因为我们可以直接定义 spring将负责将消息转换并传递给这个侦听器。 因此,这对于带注释的听者来说显然是有意义的。我的问题是,设置消息转换器对非注释消息侦听器有用吗?类似于 从我在docs/javadocs中的搜索和对源代码的有限理解来看,我认为为这种情况设置消息转换器是没有帮助

  • 使用MVC Java编程配置方式时,如果你想替换Spring MVC提供的默认转换器,完全定制自己的HttpMessageConverter,这可以通过覆写configureMessageConverters()方法来实现。如果你只是想定制一下,或者想在默认转换器之外再添加其他的转换器,那么可以通过覆写extendMessageConverters()方法来实现。 下面是一段例子,它使用定制的Ob

  • 我有一个JMS侦听器,它正在从另一个应用程序接收字节消息。当前应用程序正在使用Spring JMS。我想在这里介绍spring集成。因此,我添加了以下示例代码来侦听消息。 然后我得到如下类强制转换异常: 我得到了一个ByteMessage,但我没有找到一个关于如何提取带有字节数组有效负载的ByteMessage的好例子。我是Spring集成世界的新手。

  • 我正在尝试使用Spring进行GET http请求。 我的主要班级: 使用CatMessage类: 我应该得到回来,因为我使用不工作的用户名和密码组合(和这个服务器部分工作正常),是: { } 我认为这应该行得通,因为我几乎是在复制Spring for Android的基本auth项目 但是发生的是解析的问题(我认为)。当然,我已经包含了Jackson和Spring依赖项,所以我不希望我的问题出现

  • 我正在使用Spring Cloud Stream和RabbitMQ活页夹。它可以很好地处理字节[]负载和Java本机序列化,但我需要处理JSON负载。 这是我的处理器类。 输入到和输出到是带有Jackson注释的POJO。 如何配置JSON转换策略 消息头应该如何被接受和处理