当前位置: 首页 > 知识库问答 >
问题:

Kafka:使用架构注册表序列化Avro消息时出错

章永安
2023-03-14

我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息:

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

我在schema:GET中设置了schema

http://localhost:8081/subjects/documentCreations-key/versions/3

回应:

{
"subject": "documentCreations-key",
"version": 3,
"id": 1,
"schema": "\"string\""}

得到

http://localhost:8081/subjects/documentCreations-value/versions/4

回答

{
"subject": "documentCreations-value",
"version": 4,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"Document\",\"namespace\":\"com.bade\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"}]}"

}

这是我的Scala类:

class Document(val name: java.lang.String,
               val title: java.lang.String,
               val path: java.lang.String)

Kafka制作人的部分:

class MyKafkaProducer {

  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("schema.registry.url", "http://localhost:8081")

  private val producer = new KafkaProducer[java.lang.String, Document](props)

  def sendCreateDocumentMessage(document: Document): RecordMetadata = {

    val documentRecord = new ProducerRecord[java.lang.String, Document](SharedConfig
      .documentCreationsTopic,
      document.name, document)

    producer.send(documentRecord).get()
  }

我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢

编辑:固定类名

共有1个答案

关志勇
2023-03-14

回答我自己的问题。显然,(反)序列化不是自动完成的(通过反射或其他方式),但必须从avro模式文件生成类。发布我的<code>pom。xml如果对某人有帮助:

<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>

    <plugins>

        <!--force java 8-->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.3.1</version>
        </plugin>

        <plugin>
            <!-- Build an executable JAR -->
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>3.0.2</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>Main</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>${avro.version}</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>src/main/avro
                        </sourceDirectory>
                    </configuration>

                </execution>
            </executions>
        </plugin>
        <!--force discovery of generated classes-->
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>build-helper-maven-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <id>add-source</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>add-source</goal>
                    </goals>
                    <configuration>
                        <sources>
                            <source>target/generated-sources/avro</source>
                        </sources>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

<properties>
    <kafka.version>1.0.0</kafka.version>
    <confluent.version>4.0.0</confluent.version>
    <avro.version>1.8.2</avro.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>${confluent.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>${avro.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>${avro.version}</version>
    </dependency>

</dependencies>

我使用以下mvn命令构建它:

mvn clean:clean avro:schema compiler:compile scala:compile jar:jar
 类似资料:
  • 我正在创建一个avro类,它包含一个字符串和一个映射作为字段。我可以通过maven生成avro类,并且能够在localhost:8081中创建注册表

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • 我正在尝试用Avro序列化器和模式注册表向Kafka发送一个对象。 这里是一个简化的代码: 我假设架构是从注册表“幕后”读取的,对象(用户)是序列化的,但我得到了下面的错误。 我遗漏了什么? 我必须显式读取架构并发送GenericRecord吗? org.apache.kafka.common.errors.SerializationException:序列化Avro消息 时出错,原因是:java

  • 我正在使用Spring Cloud Stream和Confluent Schema Registry注册Avro模式。 架构注册成功。但是,当我的流侦听器接收到消息时,负载仍然以字节为单位。 这是我的财产。 在接收消息时,我注意到“AbstractAvroMessageConverter”中的“convertFromInternal”从未被调用,而这应该是用来解码消息的。

  • 我试图使用kafka-avro-console-producer 5.4.0-ccs不自动注册模式。我试着用: 但它仍然在注册模式。属性似乎正确:https://github.com/confluentinc/schema-registry/blob/a0a04628687a72ac6d01869d881a60fbde4177e7/avro-serializer/src/main/java/io/