主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。
我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容:
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。
我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
但这似乎不是正确的方法。
是否有任何文档/示例可以向我指出,以便我了解您将如何与Kafka AVRO和Beam合作?
我已更新我的代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
}
}
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;
Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}
但是我现在得到了下面的错误
incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >
我一定是导入了不正确的序列化程序?
洋平的回答不错,但是我也发现这个管用
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
...
public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...
其中< code>MyCustomClass是用Avro工具生成的代码。
您可以按如下方式使用KafkaAvroDeserializer:
PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
其中MyClass是POJO类生成的Avro模式。
确保您的POJO类具有注释AvroCoder,如下例所示:
@DefaultCoder(AvroCoder.class)
public class MyClass{
String name;
String age;
MyClass(){}
MyClass(String n, String a) {
this.name= n;
this.age= a;
}
}
我也面临同样的问题。在这个邮件档案中找到了解决方案。http://mail-archives . Apache . org/mod _ mbox/beam-user/201710 . mbox/< CAMsy _ ni vrt _ 9 _ xfxotk 1 inh XB = x _ yadbcbbn 4aquu_hn0GJ0nA@mail.gmail.com >
在您的情况下,您需要定义自己的反序列化器
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}
@Override
public void close() {} }
然后将您的KafkaAvroDeserializer指定为ValueDeserializer。
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在
我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:
问题内容: 我是C ++的新手。使用序列化和反序列化类型数据的最简单方法是什么。我发现了一些使用示例,但它们对我来说是晦涩的。 问题答案: 请注意,将键解释为路径,例如,将对“ ab” =“ z”放置将创建{“ a”:{“ b”:“ z”}} JSON,而不是{“ ab”:“ z”} 。否则,使用是微不足道的。这是一个小例子。
我需要一个自定义反序列化器来在复杂的POJO中转换字符串。反序列化工作直到使用反序列化器:特别是使用自定义反序列化器时,我的对象的非对象属性不会序列化。 我有一个restful Web服务,它有一个pojo作为参数。 所以我的类PreentivoWs需要一个方法。这里是类定义: 在jsonObject中,我有一个枚举定义为 但此对象需要转换反序列化程序: 并在财产上标注: fromString方法
我想将json反序列化到类Foo: IBar有两个实现,但是当反序列化时,我总是想使用第一个实现。(理想情况下,这将使问题变得更容易,因为不需要运行时类型检查) 我相信我可以编写自定义反序列化程序,但我觉得一定有更简单的方法。 我找到了这个注释,它在没有列表的情况下非常有效。