当前位置: 首页 > 知识库问答 >
问题:

使用Apache Beam反序列化Kafka AVRO消息

仲浩旷
2023-03-14

主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据

我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
  
PCollection<String> output = input.apply(Values.<String>create());

但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。

我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法。

是否有任何文档/示例可以向我指出,以便我了解您将如何与Kafka AVRO和Beam合作?

我已更新我的代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

但是我现在得到了下面的错误

incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我一定是导入了不正确的序列化程序?

共有3个答案

辛才俊
2023-03-14
匿名用户

洋平的回答不错,但是我也发现这个管用

import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;

...

public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}

...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...

其中< code>MyCustomClass是用Avro工具生成的代码。

丁震博
2023-03-14

您可以按如下方式使用KafkaAvroDeserializer:

PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

其中MyClass是POJO类生成的Avro模式

确保您的POJO类具有注释AvroCoder,如下例所示:

@DefaultCoder(AvroCoder.class)
   public class MyClass{
      String name;
      String age;

      MyClass(){}
      MyClass(String n, String a) {
         this.name= n;
         this.age= a;
      }
  }
邢宏浚
2023-03-14

我也面临同样的问题。在这个邮件档案中找到了解决方案。http://mail-archives . Apache . org/mod _ mbox/beam-user/201710 . mbox/< CAMsy _ ni vrt _ 9 _ xfxotk 1 inh XB = x _ yadbcbbn 4aquu_hn0GJ0nA@mail.gmail.com >

在您的情况下,您需要定义自己的反序列化器

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
  
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

然后将您的KafkaAvroDeserializer指定为ValueDeserializer。

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

 类似资料:
  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • 问题内容: 我是C ++的新手。使用序列化和反序列化类型数据的最简单方法是什么。我发现了一些使用示例,但它们对我来说是晦涩的。 问题答案: 请注意,将键解释为路径,例如,将对“ ab” =“ z”放置将创建{“ a”:{“ b”:“ z”}} JSON,而不是{“ ab”:“ z”} 。否则,使用是微不足道的。这是一个小例子。

  • 我需要一个自定义反序列化器来在复杂的POJO中转换字符串。反序列化工作直到使用反序列化器:特别是使用自定义反序列化器时,我的对象的非对象属性不会序列化。 我有一个restful Web服务,它有一个pojo作为参数。 所以我的类PreentivoWs需要一个方法。这里是类定义: 在jsonObject中,我有一个枚举定义为 但此对象需要转换反序列化程序: 并在财产上标注: fromString方法

  • 我想将json反序列化到类Foo: IBar有两个实现,但是当反序列化时,我总是想使用第一个实现。(理想情况下,这将使问题变得更容易,因为不需要运行时类型检查) 我相信我可以编写自定义反序列化程序,但我觉得一定有更简单的方法。 我找到了这个注释,它在没有列表的情况下非常有效。