我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final KStreamBuilder builder = new KStreamBuilder();
final Serde<String> stringSerde = Serdes.String();
builder.stream(stringSerde, stringSerde, "testin")
.mapValues(value -> AvroUtil.transform(value))
.to("testout");
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
public static GenericRecord transform(Object value) {
// ... parse string/json and generate Avro object
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
return avroRecord;
}
并得到如下所示的异常:
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: com.fasterxml.jackson.annotation.JsonProperty.access()Lcom/fasterxml/jackson/annotation/JsonProperty$Access;
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findPropertyAccess(JacksonAnnotationIntrospector.java:229)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:545)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.fromMemberAnnotationsExcept(POJOPropertyBuilder.java:996)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.findAccess(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.removeNonVisible(POJOPropertyBuilder.java:623)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._removeUnwantedAccessor(POJOPropertiesCollector.java:697)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:298)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:222)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:355)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest.toJson(RegisterSchemaRequest.java:76)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:58)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:90)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的
只是缺少了杰克逊的依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.8</version>
</dependency>
现在起作用了
我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi
我有一个环境,我使用一个Kafka Connect Worker,它使用Oracle数据库中的一些数据,然后将其推送到Avro格式的Kafka主题中。 现在,我需要创建一个Kafka连接接收器来使用这个AVRO消息,将其转换为Json,然后将其写入Redis数据库。 到目前为止,我只能在Redis上写我在topic中使用的同样的AVRO消息。我曾尝试使用转换器,但可能误解了其用法。 吼我的工人和水
我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:
我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以
我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一
我为记录创建了一个Avro模式,我们将其发布到Kafka主题中。我们实际的Kafka记录模式更复杂,但为了简洁起见,我只附上了相关部分。我们在记录中有多个嵌套子类,但由于某些原因,我在尝试发布记录时遇到以下异常(包名称已被掩盖): 这是我定义的Avro模式的当前子集。 我们的对象(KafkaRecord)的JSON表示如下: 我似乎不明白为什么Avro不喜欢这个嵌套的记录,我更喜欢不剥离这些嵌套的