当前位置: 首页 > 知识库问答 >
问题:

保存为拼花文件在火花java

马峻
2023-03-14

我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。

原因:org.apache.spark.Spark异常:写入行时任务失败

我引用了其他线程并禁用了spark推测

set("spark.speculation "," false ")

我还是会出错。我在csv中只使用了两个专栏进行测试。

输入:

portfolio_id;portfolio_code
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04

我的代码:

JavaPairRDD<Integer, String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, Integer, String>() {
    private Double[] splitStringtoDoubles(String s){
       String[] splitVals = s.split(";");
       Double[] vals = new Double[splitVals.length];
       for(int i= 0; i < splitVals.length; i++){
           vals[i] = Double.parseDouble(splitVals[i]);
       }
       return vals;
    }

    @Override
    public Tuple2<Integer, String> call(String arg0) throws Exception {
        // TODO Auto-generated method stub
        return null;
    }
});


SQLContext SQLContext;
SQLContext = new org.apache.spark.sql.SQLContext(sc);

Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class);
fundDF.printSchema();

fundDF.write().parquet("C:/test");

请帮助我在这里错过的东西。

共有2个答案

微生俊名
2023-03-14

在这里回答,所以它从@Glenne Helles Sindholt告诉的未回答部分离开。抱歉延迟发布此消息

我通过在Tuple2()中添加函数的split来解决错误,如下所示:

    public void run(String t, String u)

    { 

     public Tuple2<String,String> call(String rec){ 
        String[] tokens = rec.split(";"); 
        String[] vals = new String[tokens.length]; 
        for(int i= 0; i < tokens.length; i++)
       { 
            vals[i] =tokens[i]; 
        } 

      return new Tuple2<String, String>(tokens[0], tokens[1]); 

     } 
   });
邢寒
2023-03-14

请找到我的Java/Sark代码,用于1)加载CSV indo Spark数据集2)将数据集保存到拼花

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
                    .builder()
                    .appName("csv2parquet")
                    .config("spark.sql.warehouse.dir", "/file:/tmp")
                    .master("local")
                    .getOrCreate();

final String dir = "test.csv";


Dataset<Row> ds = spark.read().option("header", true).option("inferSchema", true).csv(dir);

final String parquetFile = "test.parquet";
final String codec = "parquet";

ds.write().format(codec).save(parquetFile);

spark.stop();

把这个加到你的绒球里

<dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.8.1</version>
</dependency>
 类似资料:
  • 我在一个Spark项目上工作,这里我有一个文件是在parquet格式,当我试图用java加载这个文件时,它给了我下面的错误。但是,当我用相同的路径在hive中加载相同的文件并编写查询select*from table_name时,它工作得很好,数据也很正常。关于这个问题,请帮助我。 java.io.ioException:无法读取页脚:java.lang.runtimeException:损坏的文

  • 由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用

  • 我有一个用例,我需要将拼花文件从Lambda保存到S3。我需要以追加模式存储Lambda函数中触发的事件。 我尝试使用Avro,但它不允许将数据以追加模式存储在同一个拼花文件中。 到目前为止,我发现只要spark允许在附加模式下将数据存储在拼花文件中…然后,我可以使用Lambda提交一个存储数据的spark作业。你认为这个可能的解决方案如何? 然而,真的不存在不使用spark的解决方案吗?预先感谢

  • 如果我写信 临时工。拼花文件夹我得到了和行号相同的文件号 我想我不太了解拼花地板,但它是自然的吗?

  • 使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中

  • 我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。