当前位置: 首页 > 知识库问答 >
问题:

消费者使用spring-cloud-stream-kafka-binder生成的spring-kafka生成的avro消息

穆鸿波
2023-03-14

我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentTypecontentType=application/x-java-object;type=java.util.ArrayList

我知道avro消息可以像下面这样反序列化。

 DatumReader<GenericRecord> datumReader =
            new SpecificDatumReader<>(targetType.newInstance().getSchema());
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

但不管用。可能是因为两件事。

>

  • 消息是avro对象的列表。但我正在尝试使用Avro模式创建DataReader。但是我尝试创建schema.createArray(userDTo.class)这样的模式。但它并不起作用。

    我认为avro消息的预期内容类型是application/avro,但当消息由s-c-s生成时,它是contenttype=application/x-java-object;

    我试图通过实现org.apache.kafka.common.serialization.deserializer来创建一个反序列化程序,并构造KafkaConsumerFactory。有人能帮忙吗?

  • 共有1个答案

    屠振濂
    2023-03-14

    请参阅property...Producer.UsenativeEncode(如生成器属性中所示)。

    使用编码

    当设置为true时,出站消息由客户端库直接序列化,客户端库必须进行相应的配置(例如,设置适当的Kafka生产者值序列化器)。使用此配置时,出站消息封送不基于绑定的contentType。当使用本机编码时,使用者有责任使用适当的解码器(例如:Kafka使用者值反序列化器)来反序列化入站消息。此外,当使用本机编码/解码时,headerMode属性将被忽略,头将不会嵌入到消息中。

     类似资料:
    • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

    • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

    • 我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?

    • 我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol

    • 以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置

    • 我有一个应用程序(spring-boot-shipping-service),其中包含一个KStream,它获取由外部生产者(spring-boot-order-service)生成的OrderCreatedEvent消息。此生成器使用以下架构: Order-Created-Event.avsc 我的与联接,并向order主题发布一种新的消息:Ordershippedevent。 Order-Sh