我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息:
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
我在schema:GET中设置了schema
http://localhost:8081/subjects/documentCreations-key/versions/3
回应:
{
"subject": "documentCreations-key",
"version": 3,
"id": 1,
"schema": "\"string\""}
得到
http://localhost:8081/subjects/documentCreations-value/versions/4
回答
{
"subject": "documentCreations-value",
"version": 4,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"Document\",\"namespace\":\"com.bade\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"}]}"
}
这是我的Scala类:
class Document(val name: java.lang.String,
val title: java.lang.String,
val path: java.lang.String)
Kafka制作人的部分:
class MyKafkaProducer {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://localhost:8081")
private val producer = new KafkaProducer[java.lang.String, Document](props)
def sendCreateDocumentMessage(document: Document): RecordMetadata = {
val documentRecord = new ProducerRecord[java.lang.String, Document](SharedConfig
.documentCreationsTopic,
document.name, document)
producer.send(documentRecord).get()
}
我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢
编辑:固定类名
回答我自己的问题。显然,(反)序列化不是自动完成的(通过反射或其他方式),但必须从avro模式文件生成类。发布我的<code>pom。xml如果对某人有帮助:
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!--force java 8-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
</plugin>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<mainClass>Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>src/main/avro
</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!--force discovery of generated classes-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<kafka.version>1.0.0</kafka.version>
<confluent.version>4.0.0</confluent.version>
<avro.version>1.8.2</avro.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>
我使用以下mvn命令构建它:
mvn clean:clean avro:schema compiler:compile scala:compile jar:jar
我正在创建一个avro类,它包含一个字符串和一个映射作为字段。我可以通过maven生成avro类,并且能够在localhost:8081中创建注册表
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?
我正在尝试用Avro序列化器和模式注册表向Kafka发送一个对象。 这里是一个简化的代码: 我假设架构是从注册表“幕后”读取的,对象(用户)是序列化的,但我得到了下面的错误。 我遗漏了什么? 我必须显式读取架构并发送GenericRecord吗? org.apache.kafka.common.errors.SerializationException:序列化Avro消息 时出错,原因是:java
我正在使用Spring Cloud Stream和Confluent Schema Registry注册Avro模式。 架构注册成功。但是,当我的流侦听器接收到消息时,负载仍然以字节为单位。 这是我的财产。 在接收消息时,我注意到“AbstractAvroMessageConverter”中的“convertFromInternal”从未被调用,而这应该是用来解码消息的。
我试图使用kafka-avro-console-producer 5.4.0-ccs不自动注册模式。我试着用: 但它仍然在注册模式。属性似乎正确:https://github.com/confluentinc/schema-registry/blob/a0a04628687a72ac6d01869d881a60fbde4177e7/avro-serializer/src/main/java/io/