我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。
原因:org.apache.spark.Spark异常:写入行时任务失败
我引用了其他线程并禁用了spark推测
set("spark.speculation "," false ")
我还是会出错。我在csv中只使用了两个专栏进行测试。
输入:
portfolio_id;portfolio_code
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
1000042;CHNTESTPF04
我的代码:
JavaPairRDD<Integer, String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, Integer, String>() {
private Double[] splitStringtoDoubles(String s){
String[] splitVals = s.split(";");
Double[] vals = new Double[splitVals.length];
for(int i= 0; i < splitVals.length; i++){
vals[i] = Double.parseDouble(splitVals[i]);
}
return vals;
}
@Override
public Tuple2<Integer, String> call(String arg0) throws Exception {
// TODO Auto-generated method stub
return null;
}
});
SQLContext SQLContext;
SQLContext = new org.apache.spark.sql.SQLContext(sc);
Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class);
fundDF.printSchema();
fundDF.write().parquet("C:/test");
请帮助我在这里错过的东西。
在这里回答,所以它从@Glenne Helles Sindholt告诉的未回答部分离开。抱歉延迟发布此消息
我通过在Tuple2()中添加函数的split来解决错误,如下所示:
public void run(String t, String u)
{
public Tuple2<String,String> call(String rec){
String[] tokens = rec.split(";");
String[] vals = new String[tokens.length];
for(int i= 0; i < tokens.length; i++)
{
vals[i] =tokens[i];
}
return new Tuple2<String, String>(tokens[0], tokens[1]);
}
});
请找到我的Java/Sark代码,用于1)加载CSV indo Spark数据集2)将数据集保存到拼花
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("csv2parquet")
.config("spark.sql.warehouse.dir", "/file:/tmp")
.master("local")
.getOrCreate();
final String dir = "test.csv";
Dataset<Row> ds = spark.read().option("header", true).option("inferSchema", true).csv(dir);
final String parquetFile = "test.parquet";
final String codec = "parquet";
ds.write().format(codec).save(parquetFile);
spark.stop();
把这个加到你的绒球里
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.1</version>
</dependency>
我在一个Spark项目上工作,这里我有一个文件是在parquet格式,当我试图用java加载这个文件时,它给了我下面的错误。但是,当我用相同的路径在hive中加载相同的文件并编写查询select*from table_name时,它工作得很好,数据也很正常。关于这个问题,请帮助我。 java.io.ioException:无法读取页脚:java.lang.runtimeException:损坏的文
由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用
我有一个用例,我需要将拼花文件从Lambda保存到S3。我需要以追加模式存储Lambda函数中触发的事件。 我尝试使用Avro,但它不允许将数据以追加模式存储在同一个拼花文件中。 到目前为止,我发现只要spark允许在附加模式下将数据存储在拼花文件中…然后,我可以使用Lambda提交一个存储数据的spark作业。你认为这个可能的解决方案如何? 然而,真的不存在不使用spark的解决方案吗?预先感谢
如果我写信 临时工。拼花文件夹我得到了和行号相同的文件号 我想我不太了解拼花地板,但它是自然的吗?
使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中
我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。