当前位置: 首页 > 知识库问答 >
问题:

如何用Avro在parquet文件模式中创建重复类型?

包承望
2023-03-14
schema = new org.apache.avro.Schema.Parser().parse("{\n" +
         "     \"type\": \"record\",\n" +
         "     \"namespace\": \"com.example\",\n" +
         "     \"name\": \"Patterns\",\n" +
         "     \"fields\": [\n" +
         "       { \"name\": \"id\", \"type\": \"string\" },\n" +
         "       { \"name\": \"name\", \"type\": \"string\" },\n" +
         "       { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
         "       { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
         "       { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +
         "     ]\n" +
         "}");
Pipeline p = Pipeline.create(
        PipelineOptionsFactory.fromArgs(args).withValidation().create());

p.apply(JdbcIO.<GenericRecord> read()
       .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
             "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
             .withUsername("username")
             .withPassword("password"))
       .withQuery("select * from table limit(10)")
       .withCoder(AvroCoder.of(schema))
       .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
            GenericRecord record = new GenericData.Record(schema);
            ResultSetMetaData metadata = resultSet.getMetaData();
            int columnsNumber = metadata.getColumnCount();
            for(int i=0; i<columnsNumber; i++) {
                Object columnValue = resultSet.getObject(i+1);
                if(columnValue instanceof UUID) columnValue=columnValue.toString();
                if(columnValue instanceof Timestamp) columnValue=columnValue.toString();
                if(columnValue instanceof PgArray) {
                    Object[] array = (Object[]) ((PgArray) columnValue).getArray();
                    List list=new ArrayList();
                    for (Object d : array) {
                        if(d instanceof PGobject) {
                            list.add(((PGobject) d).getValue());
                        }
                    }
                    columnValue = list;
                 }
                 record.put(i, columnValue);
            }
            return record;
        }))
  .apply(FileIO.<GenericRecord>write()
        .via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
        .to("something.parquet")
  );

p.run();
message com.example.table {
  required binary id (UTF8);
  required binary name (UTF8);
  required binary createdAt (UTF8);
  required binary updatedAt (UTF8);
  optional group someArray (LIST) {
    repeated binary array (UTF8);
  }
}

message com.example.table {
  required binary id (UTF8);
  required binary name (UTF8);  
  required binary createdAt (UTF8);
  required binary updatedAt (UTF8);
  optional repeated binary someArray(UTF8);
}

请帮忙

共有1个答案

徐文斌
2023-03-14

我没有找到从Avro创建不在GroupType中的重复元素的方法。

Beam中的ParquetIO使用Parquet-MR项目中定义的“标准”avro转换,在此实现。

似乎有两种方法可以将Avro数组字段转换为Parquet消息--但这两种方法都不会创建您要查找的内容。

 类似资料:
  • 我将JSON文件&JSON模式解析为AVRO模式。我有点困惑,我是否必须使用AVRO文档中定义的数据类型来编写手动AVRO模式。 或者是否有任何自动化的方法/函数/程序可以完全按照要求工作?

  • 我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。

  • Apache Drill有一个很好的功能,可以从许多传入的数据集中制作镶木地板文件,但是似乎没有太多关于如何稍后使用这些镶木地板文件的信息 - 特别是在Hive中。 Hive有没有办法利用这些“1_0_0.parquet”等文件?也许创建一个表并从拼花文件加载数据,或者创建一个表并以某种方式将这些拼花文件放在hdfs中,以便Hive读取它?

  • 我确实有这样的身体- 有人能帮我为这种类型的主体创建一个有效的Avro模式吗。我找到了一个创建类似这样的嵌套模式的示例- }] 当我提供这种模式时,它在下面的行中为我提供了错误- GenericRecord avroRecord=新的GenericData。记录(模式); 错误是-org。阿帕奇。阿夫罗。AvroRuntimeException:不是记录架构:

  • 我需要在Swift中创建一个singleton类。谁能帮我查一下密码吗?我已经知道,单例类在创建泛型代码方面非常有用。

  • 上面应该在我的SD卡中创建一个文件夹,如果它不存在,则不要做任何事情。虽然吐司根据条件工作,但它不存在时不会创建目录。知道如何解决吗? 我的清单如下所示: 更新:我更新了清单和代码,但它仍然没有在我的SD卡中创建文件夹。请记住,我正在使用Eclipse并将应用程序直接运行到我的手机(GNex VZW),而不是使用AVD。