当前位置: 首页 > 知识库问答 >
问题:

GenericRecord的Avro模式:能够保留空白字段

戈建白
2023-03-14

我正在使用Java将JSON转换为Avro,并使用Google DataFlow将其存储到GCS。Avro模式是使用SchemaBuilder在运行时创建的。

我在模式中定义的字段之一是可选的LONG字段,它是这样定义的:

SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record(mainName).fields();
Schema concreteType = SchemaBuilder.nullable().longType();
fields.name("key1").type(concreteType).noDefault();

现在,当我使用上面的模式创建GenericRecord时,并且“key1”未设置,当将结果GenericRecord放在我的DoFn的上下文中时:context.output(res);我得到以下错误:

异常在线程"main"org.apache.beam.sdk.Pipeline$PipelineExecutionException:org.apache.avro.未解析的UnionException: not in Union["long","null"]: 256

我还尝试使用with Default(0L)做同样的事情,得到了相同的结果。

我错过了什么?谢谢

共有2个答案

赵河
2023-03-14

如果我正确理解您的问题,您正在尝试接受JSON字符串并将它们保存在Cloud Storage存储桶中,在数据通过数据流时使用Avro作为数据的编码器。在我看来,您的代码没有任何明显的错误。我已经这样做了,包括将数据保存到Cloud Storage和BigQuery。

您可以考虑使用一种更简单且可能更不容易出错的方法:为您的数据定义一个Java类,并在其上使用Avro注释以使编码器能够正常工作。这是一个例子:

import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Data {
    public long nonNullableValue;
    @Nullable public long nullableValue;
}

然后,在您的DnFn实现中使用此类型,就像您可能已经使用的那样。Beam应该能够使用Avro在工作人员之间正确移动数据,即使标记为@Nullable的字段为空。

勾起运
2023-03-14

当我尝试如下操作时,它对我很好,您可以尝试打印有助于比较的模式,也可以删除null()以供长类型尝试。

fields.name("key1").type().nullable().longType().longDefault(0);
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.RecordBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class GenericRecordExample {

  public static void main(String[] args) {

    FieldAssembler<Schema> fields;
    RecordBuilder<Schema> record = SchemaBuilder.record("Customer");
    fields = record.namespace("com.example").fields();
    fields = fields.name("first_name").type().nullable().stringType().noDefault();
    fields = fields.name("last_name").type().nullable().stringType().noDefault();
    fields = fields.name("account_number").type().nullable().longType().longDefault(0);

    Schema schema = fields.endRecord();
    System.out.println(schema.toString());

    // we build our first customer
    GenericRecordBuilder customerBuilder = new GenericRecordBuilder(schema);
    customerBuilder.set("first_name", "John");
    customerBuilder.set("last_name", "Doe");
    customerBuilder.set("account_number", 999333444111L);
    Record myCustomer = customerBuilder.build();
    System.out.println(myCustomer);

    // writing to a file
    final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
      dataFileWriter.create(myCustomer.getSchema(), new File("customer-generic.avro"));
      dataFileWriter.append(myCustomer);
      System.out.println("Written customer-generic.avro");
    } catch (IOException e) {
      System.out.println("Couldn't write file");
      e.printStackTrace();
    }

    // reading from a file
    final File file = new File("customer-generic.avro");
    final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
    GenericRecord customerRead;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)){
      customerRead = dataFileReader.next();
      System.out.println("Successfully read avro file");
      System.out.println(customerRead.toString());

      // get the data from the generic record
      System.out.println("First name: " + customerRead.get("first_name"));

      // read a non existent field
      System.out.println("Non existent field: " + customerRead.get("not_here"));
    }
    catch(IOException e) {
      e.printStackTrace();
    }
  }
}
 类似资料:
  • 有没有办法将 GenericRecord(我刚刚从 Kafka 消息中得到的)反序列化为嵌套 POJO?我实际上正在将其反序列化为 Scala 的案例类,但我意识到这更难。我通过互联网搜索,似乎每个人都在手动进行。您知道任何能够做到这一点的库吗?

  • 问题内容: 我对sed中的两个概念感到困惑:保持空间和模式空间。有人可以帮忙解释一下吗? 这是手册的摘要: 这六个命令确实使我感到困惑。 问题答案: 当sed逐行读取文件时,当前已读取的行将插入到 模式 缓冲区(模式空间)中。模式缓冲区类似于临时缓冲区,即存储当前信息的暂存器。当您告诉sed打印时,它将打印图案缓冲区。 保持缓冲区/保持空间就像是一个长期存储,因此您可以捕获某些东西,将其存储起来,

  • 问题内容: 每次尝试运行该程序时,我都会反复收到以下异常。 VM初始化期间发生错误 无法为对象堆保留足够的空间 无法创建Java虚拟机。 我试图增加虚拟内存(页面大小)和RAM大小,但无济于事。 如何消除此错误? 问题答案: 使用或任意数量)(或简称)运行JVM

  • 我使用Avro模式动态地从C#应用程序生成消息到Kafka集群,使用汇合的Kafka客户机。在编译时不知道数据类型,因此我使用命名空间中的GenericRecord类,如下所述:https://www.confluent.io/blog/decoupling-systems-with-apache-kafka-schema-registry-and-avro/。 但是,我有一个问题--如果模式有一

  • 这是在“Gradlew Build”期间在Android Studio中发生的。 错误:“VM初始化时发生错误,无法为3174400KB对象堆保留足够的空间”(3.18GB) 项目文件夹本身只占用1.1GB,所以我不知道这是怎么可能的。 更新:这个问题似乎已经自行解决。

  • 我已经安装了Android Studio 1.1.0。我还没有做任何事情,比如启动新的Android应用程序或导入任何东西。不知何故,它试图构建一些东西,它抛出同步错误。 错误:无法启动守护进程。此问题可能是由于守护进程配置不正确造成的。例如,使用了一个无法识别的jvm选项。请参阅http://gradle.org/docs/2.2.1/userguide/gradle_daemon.html中关