我在S3有一些遗留数据,我想使用Spark 2和Java API将它们转换成parquet格式。
我有所需的Avro模式(. avsc文件)及其使用Avro编译器生成的Java类,我想使用这些模式以Parque格式存储数据。输入数据不是任何标准格式,但我有一个库,可以将遗留文件中的每一行转换为Avro类。
是否可以将数据作为 JavaRDD 读取
类似于:
JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));
converted.saveAsParquet("s3://bucket/destination"); //how do I do this
像上面这样的东西可行吗?我后来想使用Hive,Presto和Spark处理转换后的镶木地板数据。
基本上是Steve提到的方法,但Hadoop输入和输出格式已经存在:
Job job = new Job();
ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$);
AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024);
AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
AvroParquetOutputFormat.setCompressOutput(job, true);
sparkContext.textFile("s3://bucket/path_to_legacy_files")
.map(line -> customLib.convertToAvro(line))
.mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record))
.saveAsNewAPIHadoopFile(
"s3://bucket/destination",
Void.class,
MyAvroType.class,
new ParquetOutputFormat<MyAvroType>().getClass(),
job.getConfiguration());
暂时忽略S3;这是生产细节。您需要从更简单的问题“将我格式的本地文件转换为标准文件”开始。这是您可以通过针对小数据样本集的单元测试在本地实现的东西。
这在Spark中通常与Hadoop Mapduce相同:实现InputFormat的子类
有很多关于这方面的文档,以及堆栈溢出问题。在源树本身中有很多例子。
RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:
我在Spark中有一个数据框,看起来像这样: 它有30列:只显示其中的一些! 因此,我必须在Scala中将这个数据帧转换成一个键值对,使用键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同的键数)的唯一值。 例如:使用上面的案例,我希望在Scala中的map(key-value)集合中有一个输出,如下所示: 我对斯卡拉和斯帕克是新手,我试着做这样的事情。 但是,这不起作用。:/此操作完成后
我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。
问题内容: 我正在尝试将Pandas DF转换为Spark one。DF头: 码: 我得到一个错误: 问题答案: 您需要确保您的pandas dataframe列适合spark推断的类型。如果您的熊猫数据框列出类似以下内容: 而且您遇到该错误,请尝试: 现在,确保实际上是您希望这些列成为的类型。基本上,当底层Java代码尝试从python中的对象推断类型时,它会使用一些观察值并做出猜测,如果该猜测
我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:
我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(