public class Cat{
private Date dateOfAdoption;
private String name;
private Boolean isMale;
private Person owner;
public Cat(String name, Boolean isMale, Person owner, Date dateOfAdoption){
this.name = name;
this.isMale = isMale;
this.owner = owner;
this.dateOfAdoption = dateOfAdoption;
}
}
public class AdoptedCats{
public List<Cat> catList;
public AdoptedCats(){
}
}
AdoptedCats adoptedCats = <the adoptedcatjavaobjectbinaryfile>
System.out.println(adoptedCats.cats.length) // this would output say, 75 if there were 75 rows in the original CSV.
我想我必须使用Java Spark...但我不确定如何创建spark脚本,以便它可以分治整个一月的所有cat目录,并将它们输出到各自的目录中。任何帮助或例子都是最有帮助的。
实现这一目标的步骤-
>
假设文件(我有一个包含目录的小CSV文件的文件夹)是/somepath/catmydata.CSV
JavaSparkContext context = ......;
//Retrieve list of files from master file
List<String> files = context.textFile("/somepath/catmydata.csv")
.flatMap(row->Arrays.asList(row.split(",")).iterator()).collect();
//Iterate through each file and save it as object file
Map<String,JavaRDD<Cat>> adoptedCatsMap =new HashMap<String,JavaRDD<Cat>>();
for (String file:files) {
// Extract file path and attach to convert
String output=String.format("/convertedcatdata/%s",file.substring(file.indexOf('/'),file.length()))
JavaRDD<Cat> adoptedCatsRdd = context.textFile(file).map(s->new Cat(s));
adoptedCatsMap.put(output,adoptedCatsRdd);
//This will write number of files based on rdd partitions,
// if you want only one file in a directory then save it
// to temporary directory then use copyMerge to create one file using Hadoop fs FileUtil
// https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html
//Save to physical location if required.
adoptedCatsRdd.saveAsObjectFile(output);
}
我希望这能有所帮助。
我有一个类,它通过实现中的和方法来实现自定义Kryo序列化程序(请参见下面的示例)。如何用Spark注册此自定义序列化程序? 现在在Spark: 不幸的是,Spark没有给我注册自定义序列化程序的选项。你知道有没有办法做到这一点?
问题内容: 我有两个要使用Jackson序列化为JSON的Java类: 我想将Item序列化为此JSON: 用户序列化为仅包含。我还将能够将所有用户对象序列化为JSON,例如: 所以我想我需要为此编写一个自定义的序列化程序并尝试过: 我使用来自Jackson How-to:Custom Serializers的 代码对JSON进行了序列化: 但是我得到这个错误: 如何在Jackson上使用自定义序
让我知道你的想法,并祝贺这一惊人的作品。在以前的项目中,我们确实使用了storm或spark流媒体,但Flink在实时流媒体分析方面遥遥领先。 谢谢,继续努力!
我正在使用以下方法创建地图: 它给出了以下输出: 我需要向键添加类变量名,如下所示:
当我创建如上图所示的UDF函数时,我得到任务序列化错误。只有在使用在集群部署模式下运行代码时,才会出现此错误。然而,它在Spark-Shell中运行良好。 我尝试添加,但没有解决问题。
我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题: 如果我运行以下Scala/Spark代码,它将正常运行: 但是,在声明了此处建议的自定义数据帧类型之后 使用它的例子如下: 这次运行成功。 现在如果我再次运行下面的代码(同上) 我收到了错误信息: rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize