当前位置: 首页 > 知识库问答 >
问题:

找不到Spring kafka AVRO生成的类

马峻
2023-03-14

我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。

但是当侦听器启动时,只有以下错误

   java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
       at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
       at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
   Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ps_git_repo-0 at offset 0. If needed, please seek past the record to continue consumption.
   Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class ps_git_repo specified in writer's schema whilst finding reader's schema for a SpecificRecord.

当我不使用avro对数据进行反序列化时,我会收到数据但不可读。

在pom中。xml我有以下依赖项


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.10.2</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>6.2.0</version>
        <exclusions>
            <exclusion>
                <artifactId>netty</artifactId>
                <groupId>io.netty</groupId>
            </exclusion>
        </exclusions>
    </dependency>

在应用程序中。属性我添加了反序列化器和架构注册表url。


    spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer = io.confluent.kafka.serializers.KafkaAvroDeserializer
    spring.kafka.bootstrap-servers = http://localhost:9092
    spring.kafka.consumer.properties.specific.avro.reader = true
    spring.kafka.consumer.properties.schema.registry.url = http://localhost:8081

在构建中,我使用avro-maven插件从连接器创建的模式生成POJO。

pom中的插件。xml


    <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.10.2</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                    <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <stringType>String</stringType>
                </configuration>
            </execution>
        </executions>
    </plugin>

我已将以下模式放入文件夹,并使用mvn generate sources生成pojo

架构。avsc公司


    {
      "connect.name": "ps_git_repo",
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "default": null,
          "name": "name",
          "type": [
            "null",
            "string"
          ]
        }
      ],
      "name": "ps_git_repo",
      "namespace": "com.company.api.kafkademo",
      "type": "record"
    }

我得到了ps\u git\u回购。java类,然后我有这个侦听器来检索消息。

    @SpringBootApplication
    @EnableKafka
    public class KafkaDemoApplication {
    
        @KafkaListener(groupId = "test123", topics = "ps_git_repo_test")
        public void handleMessage(ps_git_repo message) {
            System.out.println(message);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaDemoApplication.class, args);
        }
    
    }

找不到架构。

有人知道怎么了吗?

共有1个答案

颛孙博易
2023-03-14

反序列化程序使用了connect。名称字段而不是命名空间,以查找正确的类。

我在JDBC\U连接器的配置中添加了以下几行代码,以便连接器生成正确的名称空间

"transforms":"AddNamespace",
"transforms.AddNamespace.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.company.api.kafkademo.ps_git_repo"
 类似资料:
  • 一开始,我是stackoverflow的新手,我会尽我最大的努力向您提供所有的信息,这些信息可能会帮助您帮助我解决我的问题。 因为它只是发生,当我建立的战争,我认为它可能不得不。但我不知道。 希望你能帮助我,谢谢你。

  • 开始使用AWS代码构建。 目标是让docker图像作为最终结果,并在其中运行nodejs、hapi和示例应用程序。 目前我有一个问题:“无法准备上下文:无法在Dockerfile路径中计算符号链接: lstat /tmp/src049302811/src/Dockerfile:没有这样的文件或目录”出现在BUILD阶段。 项目详情: S3存储桶用作源 存储在各自S3存储桶中的ZIP文件包含buil

  • 如果我想更新Cordova的Android项目。 更新平台版本后出现以下错误:

  • 我是刚开始使用gradle的,我遇到了一个构建错误,但我并不真正理解这个错误。我的项目只是一个空壳,有目录结构,没有java源代码。这是我的root build.gradle文件 当我执行gradle build命令时,生成失败,因为它不知道带有以下消息的testCompile方法:

  • 我下载并反编译了minecraft插件的jar文件,并通过创建一个新的java项目Import将其添加到eclipse中 这导致了: 之后,我更改了一行代码,然后尝试导出它。 我不知道我该怎么做来解决这个问题。请帮忙。

  • 我正在尝试用React-Native构建我的第一个应用程序。 我正在遵循这两个教程: https://facebook.github.io/react-native/docs/gett-started.html#content https://facebook.github.io/react-native/docs/android-setup.html 我尝试安装Android build too