Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
properties.put(StreamsConfig.CLIENT_ID_CONFIG, "cleant-id");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
Serde<T> pojoSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
pojoSerde.configure(serdeConfig, false);
Consumed<String, Pojo> consumed = Consumed.with(Serdes.String(), pojoSerde);
KStream<String, Pojo> source = builder.stream(TopicName.STREAM1.toString(), consumed);
KTable<String, Long> storePojoCount = source
.groupBy((key, value) -> key)
.count(Materialized.as(StoreName.STORE_WORD_COUNT.toString()));
Produced<String, Long> produced = Produced.with(Serdes.String(), Serdes.Long());
storePojoCount.toStream().to(TopicName.STREAM2.toString(), produced);
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
Exception in thread "cleant-id-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
问题是物化对象没有适当的反序列化器--Avro正在尝试反序列化KTable值,因为Avro是默认值反序列化器。它不能这样做,因为KTable值实际上是长值。
使用正确的反序列化器创建物化对象将解决问题。
protected <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> persistentStore(StoreName storeName, Serde<K> keyType, Serde<V> valueType) {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName.toString());
return Materialized.<K, V>as(storeSupplier).withKeySerde(keyType).withValueSerde(valueType);
}
任何商店供应商都可以在这里使用-这正是符合我的需要的一个。
我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
我有一台装有Java 1.6的服务器。在这里,我需要使用Confluent的< code > KafkaAvroDeserializer 来反序列化avro消息。 问题是: 如果我使用Confluent-1.0(它与Java兼容 如果我使用Confluent-2.0或更高版本,它拥有一切,但它只与java兼容 在这种情况下我该怎么办? 为了比较: http://docs.confluent.io/
有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。