当前位置: 首页 > 知识库问答 >
问题:

Spring-cloud-stream MessageConversionException

仲涵亮
2023-03-14

我有一个接受@Payload字符串的@StreamListener。为了测试这个侦听器类,我使用嵌入式Kafka编写了一个Junit类。运行我的测试类时,我得到以下错误

错误O.S.I.Handler.LoggingHandler-org.SpringFramework.Messaging.Converter.MessageConversionException:无法在此处输入代码将GenericMessage从[[B]转换为[java.lang.String]

如果我将@Payload的数据类型从String更改为Byte[],那么我的侦听器类将选择消息

有人能帮我知道这是什么问题吗?我猜这是与Could Steam配置有关的东西。

@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
        topics = {
                "input",
                "output"})
public class TestUtils {

    public static final String KEY_SERIALIZER = "key.serializer";
    public static final String VALUE_SERIALIZER = "value.serializer";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @BeforeEach
    public void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void someTest() throws Exception {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        senderProps.put(KEY_SERIALIZER, StringSerializer.class);
        senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
        template.setDefaultTopic("input");
        template.sendDefault("foo");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
                "input_group",
                "false",
                this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("output"));
        ConsumerRecords<String, String> records = consumer.poll(10_000);
        consumer.commitSync();
        Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        bindings:
          input:
            consumer:
              enable-dlq: true
              dlq-name: output
              dlq-producer-properties:
                retries: 1
        binder:
          brokers: ${spring.embedded.kafka.brokers}
          replicationFactor: ${replication_factor:1}
          autoCreateTopics: true
          autoAddPartitions: true
          configuration:
            retries: 1
            batch.size: 16384
            linger.ms: 1
            enable.idempotence: true
            buffer.memory: 33554432
            request.timeout.ms: 3000
            transaction.timeout.ms: 3000
            max.block.ms: ${kafka_max_block_time:5000}
            max.poll.records: 80
            poll.timeout: 10000
            commit.retries: 1
            commit.retry.interval: 1000
            session.timeout.ms.config: 50000
            shutdown.signal: INT,TERM
            acks: "all"
      bindings:
        output:
          destination: output
          contentType: application/json
          producer:
            partitionCount: ${partition_count:1}
        input:
          destination: input
          contentType: application/json
          partitioned: true
          group: input_group

共有1个答案

苏乐童
2023-03-14

请检查是否已模拟ObjectMapper,因为ObjectMapper无法将字节[]转换为字符串。

 类似资料:
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式环境中运行良好,包括开发人员自己的笔记本电脑,裸机数据中心,以及Cloud Foundry等托管平台。

  • Spring Cloud 为开发人员提供了工具,以快速构建分布式系统中的某些常见模式(例如:配置管理、服务发现、智能路由、微代理、控制总线、一次性令牌、全局锁、分布式会话、群集状态等)。分布式系统的协调导致了样板式样,并且使用Spring Cloud开发人员可以快速站起来实现这些样板的服务和应用程序。它们可以在任何分布式环境中正常工作,包括开发人员自己的笔记本电脑,裸机数据中心以及Cloud Fo

  • Cloudfoundry的Spring Cloud可以轻松地在Cloud Foundry(平台即服务)中运行 Spring Cloud应用程序 。Cloud Foundry有一个“服务”的概念,它是“绑定”到应用程序的中间件,本质上为其提供包含凭据的环境变量(例如,用于服务的位置和用户名)。 spring-cloud-cloudfoundry-web项目为Cloud Foundry中的webapp

  • 我们最近从Spring Cloud Netflix Ribbon迁移到Spring Cloud LoadBalancer,并使用Spring Cloud kubernetes作为发现客户端。 现在spring.cloud.kubernetes.ribbon.mode(https://cloud.spring.io/spring-cloud-static/spring-cloud-kubernete

  • 主要内容:Spring Cloud Config,Spring Cloud Config 工作原理,Spring Cloud Config 的特点,搭建 Config 服务端,搭建 Config 客户端,手动刷新配置,Config+Bus 实现配置的动态刷新在分布式微服务系统中,几乎所有服务的运行都离不开配置文件的支持,这些配置文件通常由各个服务自行管理,以 properties 或 yml 格式保存在各个微服务的类路径下,例如 application.properties 或 applicat

  • 主要内容:API 网关,Spring Cloud Gateway ,Gateway 的工作流程,Predicate 断言,Spring Cloud Gateway 动态路由,Filter 过滤器在微服务架构中,一个系统往往由多个微服务组成,而这些服务可能部署在不同机房、不同地区、不同域名下。这种情况下,客户端(例如浏览器、手机、软件工具等)想要直接请求这些服务,就需要知道它们具体的地址信息,例如 IP 地址、端口号等。 这种客户端直接请求服务的方式存在以下问题: 当服务数量众多时,客户端需要维护

  • 主要内容:熔断器,Spring Cloud Hystrix ,Hystrix 服务降级,示例1,全局降级方法,解耦降级逻辑,Hystrix 服务熔断,Hystrix 故障监控在微服务架构中,一个应用往往由多个服务组成,这些服务之间相互依赖,依赖关系错综复杂。 例如一个微服务系统中存在 A、B、C、D、E、F 等多个服务,它们的依赖关系如下图。 图1:服务依赖关系 通常情况下,一个用户请求往往需要多个服务配合才能完成。如图 1 所示,在所有服务都处于可用状态时,请求 1 需要调用 A、D、E、F

  • 主要内容:OpenFeign,Feign VS OpenFeign ,OpenFeign 实现远程服务调用,OpenFeign 超时控制,OpenFeign 日志增强Netflix Feign 是 Netflix 公司发布的一种实现负载均衡和服务调用的开源组件。Spring Cloud 将其与 Netflix 中的其他开源服务组件(例如 Eureka、Ribbon 以及 Hystrix 等)一起整合进 Spring Cloud Netflix 模块中,整合后全称为 Spring Cloud Ne