当前位置: 首页 > 知识库问答 >
问题:

从JSON到Avro的Kafka流

广宏远
2023-03-14

我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。

    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 

    final KStreamBuilder builder = new KStreamBuilder();


    final Serde<String> stringSerde = Serdes.String();

    builder.stream(stringSerde, stringSerde, "testin")
            .mapValues(value -> AvroUtil.transform(value))
            .to("testout");

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.start();
public static GenericRecord transform(Object value) {

    // ... parse string/json and generate Avro object

    String userSchema = "{\"type\":\"record\"," +
            "\"name\":\"myrecord\"," +
            "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(userSchema);
    GenericRecord avroRecord = new GenericData.Record(schema);
    avroRecord.put("f1", "value1");

    return avroRecord;
}

并得到如下所示的异常:

Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: com.fasterxml.jackson.annotation.JsonProperty.access()Lcom/fasterxml/jackson/annotation/JsonProperty$Access;
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findPropertyAccess(JacksonAnnotationIntrospector.java:229)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:545)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.fromMemberAnnotationsExcept(POJOPropertyBuilder.java:996)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.findAccess(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.removeNonVisible(POJOPropertyBuilder.java:623)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._removeUnwantedAccessor(POJOPropertiesCollector.java:697)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:298)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:222)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:355)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest.toJson(RegisterSchemaRequest.java:76)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:58)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:90)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)

这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的

共有1个答案

夹谷斌蔚
2023-03-14

只是缺少了杰克逊的依赖:

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.7.8</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.7.8</version>
    </dependency>

现在起作用了

 类似资料:
  • 我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi

  • 我有一个环境,我使用一个Kafka Connect Worker,它使用Oracle数据库中的一些数据,然后将其推送到Avro格式的Kafka主题中。 现在,我需要创建一个Kafka连接接收器来使用这个AVRO消息,将其转换为Json,然后将其写入Redis数据库。 到目前为止,我只能在Redis上写我在topic中使用的同样的AVRO消息。我曾尝试使用转换器,但可能误解了其用法。 吼我的工人和水

  • 我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:

  • 我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以

  • 我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一

  • 我为记录创建了一个Avro模式,我们将其发布到Kafka主题中。我们实际的Kafka记录模式更复杂,但为了简洁起见,我只附上了相关部分。我们在记录中有多个嵌套子类,但由于某些原因,我在尝试发布记录时遇到以下异常(包名称已被掩盖): 这是我定义的Avro模式的当前子集。 我们的对象(KafkaRecord)的JSON表示如下: 我似乎不明白为什么Avro不喜欢这个嵌套的记录,我更喜欢不剥离这些嵌套的