schema = new org.apache.avro.Schema.Parser().parse("{\n" +
" \"type\": \"record\",\n" +
" \"namespace\": \"com.example\",\n" +
" \"name\": \"Patterns\",\n" +
" \"fields\": [\n" +
" { \"name\": \"id\", \"type\": \"string\" },\n" +
" { \"name\": \"name\", \"type\": \"string\" },\n" +
" { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +
" ]\n" +
"}");
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(JdbcIO.<GenericRecord> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
.withUsername("username")
.withPassword("password"))
.withQuery("select * from table limit(10)")
.withCoder(AvroCoder.of(schema))
.withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
GenericRecord record = new GenericData.Record(schema);
ResultSetMetaData metadata = resultSet.getMetaData();
int columnsNumber = metadata.getColumnCount();
for(int i=0; i<columnsNumber; i++) {
Object columnValue = resultSet.getObject(i+1);
if(columnValue instanceof UUID) columnValue=columnValue.toString();
if(columnValue instanceof Timestamp) columnValue=columnValue.toString();
if(columnValue instanceof PgArray) {
Object[] array = (Object[]) ((PgArray) columnValue).getArray();
List list=new ArrayList();
for (Object d : array) {
if(d instanceof PGobject) {
list.add(((PGobject) d).getValue());
}
}
columnValue = list;
}
record.put(i, columnValue);
}
return record;
}))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
.to("something.parquet")
);
p.run();
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional group someArray (LIST) {
repeated binary array (UTF8);
}
}
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional repeated binary someArray(UTF8);
}
请帮忙
我没有找到从Avro创建不在GroupType中的重复元素的方法。
Beam中的ParquetIO使用Parquet-MR
项目中定义的“标准”avro转换,在此实现。
似乎有两种方法可以将Avro数组字段转换为Parquet消息--但这两种方法都不会创建您要查找的内容。
我将JSON文件&JSON模式解析为AVRO模式。我有点困惑,我是否必须使用AVRO文档中定义的数据类型来编写手动AVRO模式。 或者是否有任何自动化的方法/函数/程序可以完全按照要求工作?
我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。
Apache Drill有一个很好的功能,可以从许多传入的数据集中制作镶木地板文件,但是似乎没有太多关于如何稍后使用这些镶木地板文件的信息 - 特别是在Hive中。 Hive有没有办法利用这些“1_0_0.parquet”等文件?也许创建一个表并从拼花文件加载数据,或者创建一个表并以某种方式将这些拼花文件放在hdfs中,以便Hive读取它?
我确实有这样的身体- 有人能帮我为这种类型的主体创建一个有效的Avro模式吗。我找到了一个创建类似这样的嵌套模式的示例- }] 当我提供这种模式时,它在下面的行中为我提供了错误- GenericRecord avroRecord=新的GenericData。记录(模式); 错误是-org。阿帕奇。阿夫罗。AvroRuntimeException:不是记录架构:
我需要在Swift中创建一个singleton类。谁能帮我查一下密码吗?我已经知道,单例类在创建泛型代码方面非常有用。
上面应该在我的SD卡中创建一个文件夹,如果它不存在,则不要做任何事情。虽然吐司根据条件工作,但它不存在时不会创建目录。知道如何解决吗? 我的清单如下所示: 更新:我更新了清单和代码,但它仍然没有在我的SD卡中创建文件夹。请记住,我正在使用Eclipse并将应用程序直接运行到我的手机(GNex VZW),而不是使用AVD。