当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream、Kafka Stream和本地Kafka Stream应用程序和生产者之间不兼容的Avro消息

汪兴旺
2023-03-14
  • kafka-xxx:本机应用程序
  • spring-boot-xxx:Spring Cloud Stream Applications

问题是由原生Kafka生成器生成的Avro消息不能被Spring Cloud Stream应用程序解封,例如:

原生Kafka生产者(Kafka-客户-服务项目)

@Component
class CustomerProducer {

    private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ProducerConfig.CLIENT_ID_CONFIG] = "kafka-customer-producer"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java.name
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }


    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
}
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<Int, Customer>, ...): KStream<Int, OrderShippedEvent> {

        val serdeConfig = mapOf(
                AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = SpecificAvroSerde<Customer>()
        customerSerde.configure(serdeConfig, false)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)

        ...
spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
      bindings:
        output:
          destination: customer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081

---

@Service
class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
   ...
   val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.id).build()
   customerKafkaProducer.output().send(message)
   ...

    val builder = StreamsBuilder()

    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray()::class.java.name)
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
    streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

    val serdeConfig = mapOf(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
        AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
    )

    //val byteArraySerde = Serdes.ByteArray()
    val intSerde = Serdes.IntegerSerde()
    val customerSerde = SpecificAvroSerde<Customer>()
    customerSerde.configure(serdeConfig, false)

    val customerStream = builder.stream<Int, Customer>("customer",
        Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

    val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
        Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
            .withKeySerde(intSerde)
            .withValueSerde(customerSerde)

    val customerTable = customerStream
        .map { key, value -> KeyValue(key, value) }
        .groupByKey(Serialized.with(intSerde, customerSerde))
        .reduce({ _, y -> y }, stateStore)

在本例中,本机应用程序直接崩溃,出现异常(org.apache.kafka.common.errors.SerializationException:Unknown magic Byte!)

Exception in thread "kafka-shipping-service-b89157ba-b21f-46ba-911d-97f6080d477e-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Disconnected from the target VM, address: '127.0.0.1:57856', transport: 'socket'

Process finished with exit code 0

如何确保Spring Cloud Stream Productor/Native Kafka Productor在异构企业环境中生成的消息的兼容性,在异构企业环境中,消费者可能是Spring Cloud Stream Katfka Stream应用程序和Native Kafka Stream?

共有1个答案

杨无尘
2023-03-14

@与第一个案例相关-您有一个使用kafkaavroserializer的本地Kafka生产者和使用Spring Cloud Stream提供的avro反序列化器的Spring Cloud Stream消费者。因为您使用的是不兼容的序列化器/反序列化器,所以这将不起作用。为了解决这个问题,您需要在Spring Cloud Stream端启用UsenativeDecoding并提供avro Serde(SpecificaVroserde)。这样,您就使用了相同的序列化/反序列化策略。

对于第二种情况,当序列化器不匹配时,您将得到经典错误(Unknown magic Byte!)。同样的问题。您有一个Spring Cloud Stream producer使用框架中的序列化html" target="_blank">程序,但在消费端使用specificaVroserde。为了解决这里的问题,您可以在生产者端打开UsenativeEncoding,并使用avro序列化程序。或者将Spring Cloud Stream中的Avro序列化程序包装在serde中,并提供给使用者。

我认为这里的底线是,当使用avro作为数据交换格式时,您需要确保在依赖该数据的微服务链中使用相同的序列化/反序列化策略。

 类似资料:
  • 我在使用不同版本的 Spring-kafka 的应用程序的 kafka 使用者方面遇到了问题,特别是在 2.3.13.RELEASE 和 2.8.3 之间。 当使用蓝色/绿色策略部署到PRO时,我遇到的问题是,当在使用者上使用并发时,所有分区都被分配给使用版本2.3.13.RELEASE的应用程序,而在完成分区重新平衡时,使用Spring kafka版本2.8.3(同一主题和组名)部署使用者的新应

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我们有一个运行在java 7上的服务器端进程:java-version:java version“1.7.0”java(TM)SE运行时环境(build 1.7.0-b147)java HotSpot(TM)64位服务器VM(build 21.0-b17,混合模式) 它接受来自我们自己开发的java应用程序(通过正确签名的JNLP启动)的SSL连接。 通常情况下,不管客户机应用程序是运行在Java

  • 问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 我对Spring的Kafka还是有点陌生。我的问题很简单。我有一个仅限消费者使用的应用程序,它可以连续读取Kafka,处理消息,并使用Ack侦听器手动确认消息。我有一个上游生产者专用应用程序的依赖项,在该应用程序中,他们负责向Kafka主题发送消息,以便我使用。我们最近在生产者和消费者之间实现了事务,但我想了解更多关于故障点的信息,以及如何处理那些回滚的事务,以便它们不会丢失?我已经读到,最好使用