当前位置: 首页 > 知识库问答 >
问题:

在 Spark 中将数据转换为 Parquet

鲁博瀚
2023-03-14

我在S3有一些遗留数据,我想使用Spark 2和Java API将它们转换成parquet格式。

我有所需的Avro模式(. avsc文件)及其使用Avro编译器生成的Java类,我想使用这些模式以Parque格式存储数据。输入数据不是任何标准格式,但我有一个库,可以将遗留文件中的每一行转换为Avro类。

是否可以将数据作为 JavaRDD 读取

类似于:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");    
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));    
converted.saveAsParquet("s3://bucket/destination"); //how do I do this

像上面这样的东西可行吗?我后来想使用Hive,Presto和Spark处理转换后的镶木地板数据。


共有2个答案

夏令秋
2023-03-14

基本上是Steve提到的方法,但Hadoop输入和输出格式已经存在:

         Job job = new Job();
         ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
         AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$);
         AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024);
         AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
         AvroParquetOutputFormat.setCompressOutput(job, true);

         sparkContext.textFile("s3://bucket/path_to_legacy_files")
            .map(line -> customLib.convertToAvro(line))
            .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record))
            .saveAsNewAPIHadoopFile(
                "s3://bucket/destination", 
                Void.class, 
                MyAvroType.class,
                new ParquetOutputFormat<MyAvroType>().getClass(), 
                job.getConfiguration());
越新霁
2023-03-14

暂时忽略S3;这是生产细节。您需要从更简单的问题“将我格式的本地文件转换为标准文件”开始。这是您可以通过针对小数据样本集的单元测试在本地实现的东西。

这在Spark中通常与Hadoop Mapduce相同:实现InputFormat的子类

有很多关于这方面的文档,以及堆栈溢出问题。在源树本身中有很多例子。

 类似资料:
  • RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:

  • 我在Spark中有一个数据框,看起来像这样: 它有30列:只显示其中的一些! 因此,我必须在Scala中将这个数据帧转换成一个键值对,使用键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同的键数)的唯一值。 例如:使用上面的案例,我希望在Scala中的map(key-value)集合中有一个输出,如下所示: 我对斯卡拉和斯帕克是新手,我试着做这样的事情。 但是,这不起作用。:/此操作完成后

  • 我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。

  • 问题内容: 我正在尝试将Pandas DF转换为Spark one。DF头: 码: 我得到一个错误: 问题答案: 您需要确保您的pandas dataframe列适合spark推断的类型。如果您的熊猫数据框列出类似以下内容: 而且您遇到该错误,请尝试: 现在,确保实际上是您希望这些列成为的类型。基本上,当底层Java代码尝试从python中的对象推断类型时,它会使用一些观察值并做出猜测,如果该猜测

  • 我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:

  • 我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(