当前位置: 首页 > 知识库问答 >
问题:

如何使用Spark处理CSV数据并输出序列化的自定义JavaObjects?

鲁鹤轩
2023-03-14
public class Cat{
    private Date dateOfAdoption;
    private String name;
    private Boolean isMale;
    private Person owner;

    public Cat(String name, Boolean isMale, Person owner, Date dateOfAdoption){
       this.name = name;
       this.isMale = isMale;
       this.owner = owner;
       this.dateOfAdoption = dateOfAdoption;
    }
}

public class AdoptedCats{
    public List<Cat> catList;

    public AdoptedCats(){
    }
}
AdoptedCats adoptedCats = <the adoptedcatjavaobjectbinaryfile>
System.out.println(adoptedCats.cats.length) // this would output say, 75 if there were 75 rows in the original CSV.

我想我必须使用Java Spark...但我不确定如何创建spark脚本,以便它可以分治整个一月的所有cat目录,并将它们输出到各自的目录中。任何帮助或例子都是最有帮助的。

共有1个答案

王景山
2023-03-14

实现这一目标的步骤-

>

  • 假设文件(我有一个包含目录的小CSV文件的文件夹)是/somepath/catmydata.CSV

        JavaSparkContext context  = ......;
    
        //Retrieve list of files from master file
    
        List<String> files = context.textFile("/somepath/catmydata.csv")
                .flatMap(row->Arrays.asList(row.split(",")).iterator()).collect();
    
       //Iterate through each file and save it as object file
       Map<String,JavaRDD<Cat>> adoptedCatsMap =new HashMap<String,JavaRDD<Cat>>();
        for (String file:files) {
    
           // Extract file path and attach to convert
            String output=String.format("/convertedcatdata/%s",file.substring(file.indexOf('/'),file.length()))
    
           JavaRDD<Cat> adoptedCatsRdd = context.textFile(file).map(s->new Cat(s));
    
           adoptedCatsMap.put(output,adoptedCatsRdd);
           //This will write number of files based on rdd partitions,
           // if you want only one file in a directory then save it 
           // to temporary directory then use copyMerge to create one file using Hadoop fs FileUtil
           // https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html
          //Save to physical location if required.
           adoptedCatsRdd.saveAsObjectFile(output);
        }
    

    我希望这能有所帮助。

  •  类似资料:
    • 我有一个类,它通过实现中的和方法来实现自定义Kryo序列化程序(请参见下面的示例)。如何用Spark注册此自定义序列化程序? 现在在Spark: 不幸的是,Spark没有给我注册自定义序列化程序的选项。你知道有没有办法做到这一点?

    • 问题内容: 我有两个要使用Jackson序列化为JSON的Java类: 我想将Item序列化为此JSON: 用户序列化为仅包含。我还将能够将所有用户对象序列化为JSON,例如: 所以我想我需要为此编写一个自定义的序列化程序并尝试过: 我使用来自Jackson How-to:Custom Serializers的 代码对JSON进行了序列化: 但是我得到这个错误: 如何在Jackson上使用自定义序

    • 让我知道你的想法,并祝贺这一惊人的作品。在以前的项目中,我们确实使用了storm或spark流媒体,但Flink在实时流媒体分析方面遥遥领先。 谢谢,继续努力!

    • 我正在使用以下方法创建地图: 它给出了以下输出: 我需要向键添加类变量名,如下所示:

    • easyopen序列化使用fastjson处理json,xstream处理xml。现在我们来自定义实现一个json处理: 新建一个类JsonFormatter,实现ResultSerializer接口 public class JsonFormatter implements ResultSerializer { @Override public String serialize(

    • 当我创建如上图所示的UDF函数时,我得到任务序列化错误。只有在使用在集群部署模式下运行代码时,才会出现此错误。然而,它在Spark-Shell中运行良好。 我尝试添加,但没有解决问题。