当前位置: 首页 > 知识库问答 >
问题:

带有Avro解串器的SpringKafka

欧阳德运
2023-03-14
Error while processing: ConsumerRecord(topic = topi_name, partition = 2, offset = 149, CreateTime = 1592288763784, serialized key size = 16, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = event_test, value = {"eventType": "test", "EventDataRequest": {"user": "54321", "panId": "1234", "empId": "5"}})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void className(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>) throws com.test.kafka.exception.ConsumerException,org.apache.xmlbeans.XmlException,java.io.IOException,java.lang.ClassNotFoundException' threw exception; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo

使用者配置

@Bean
@DependsOn("consumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<String, GenericRecord> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    return factory;
}
@Bean(name = "consumerFactory")
public ConsumerFactory<String, GenericRecord> consumerFactory() {
    Map<String, Object> config = new HashMap<>(kafkaProperties.getConsumer().buildProperties());
    config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    return new DefaultKafkaConsumerFactory<>(config);
}

Kafka听众

 @KafkaListener(topics = "${topic}",groupId = "${group-id}",containerFactory = "kafkaListenerContainerFactory")
  public void avroConsumer(ConsumerRecord<String, GenericRecord> record){ 
    System.out.printf("Listener value = %s%n", (GeneratedAvroPojoClass)record.value());**//here it throws class cast exception**
  }

生产者配置

    @Bean(name = "customProducerFactory")
public ProducerFactory<String, GenericRecord> customProducerFactory() {
    Map<String, Object> config = new HashMap<>(kafkaProperties.getProducer().buildProperties());
    config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    return new DefaultKafkaProducerFactory<>(config);
}

@Bean(name = "kafkaTemplate")
@DependsOn("customProducerFactory")
public KafkaTemplate<String, GenericRecord> kafkaTemplate(@Qualifier("customProducerFactory") ProducerFactory<String, GenericRecord> customProducerFactory){
    return new KafkaTemplate<>(customProducerFactory, true);
}
custom:
  kafka:
    topic: topic_name
    bootstrap-servers:  ******
    producer:
      acks: all
      client-id: client_id
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: *****
        auto.register.schema: true
        value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
    consumer:
      enable-auto-commit: true
      auto-offset-reset: earliest
      group-id: group_id_consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema.registry.url: ******
        specific.avro.reader: true
        value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = ******
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
group.id = ********
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
security.protocol = PLAINTEXT
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
KafkaAvroDeserializerConfig values: 
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = true
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url = [*****]
basic.auth.user.info = [hidden]
proxy.host = 
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
ProducerConfig values: 
acks = all
batch.size = 16384
bootstrap.servers = [*******]
buffer.memory = 33554432
client.id = client_id
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer

KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
    schema.registry.url = [*******]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

共有1个答案

爱博达
2023-03-14

不能将com.test.mypojo转换为com.test.mypojo

这通常意味着存在一个类加载器问题--反序列化器是用与@kafkalistener方法不同的类加载器实例化的。

你需要找出原因;一个人不可能只用静态的信息来回答。

编辑:

 类似资料:
  • 我正在尝试以avro格式对kafka消息进行解密我使用了以下代码:https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaResearchconsumer/kafka/reviewsconsumerco

  • 因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?

  • 大家好,我需要为下面的示例创建AVRO模式; 当我按照建议更改所有者对象时,avro-tool返回错误。 ]} 测试:

  • 我们使用Apache Kafka(不是confluent Kafka)0.10。我们想用Kafka设置AVRO模式。我有如下的avro模式。 序列化消息, 这正像预期的那样起作用。 但是,希望在主题级别设置一个Avro模式,这样,如果消息不符合Avro模式,主题将拒绝消息。 不管怎么说,我可以用阿帕奇Kafka0.10做到这一点。

  • 我有一个问题,源发送GenericMessage[payload=xxxxx,...]而接收器接收消息作为10,120,120,120,120,120。 这个问题发生在我设置Avro消息转换器之后。如果我删除Avro消息转换器并使用StreamListener来处理消息转换,它会正常工作。 源应用程序。属性 水槽应用 消息转换器 应用程序类 我是否缺少配置?谢谢