当前位置: 首页 > 面试题库 >

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

太叔高义
2023-03-14
问题内容

我是新手,我想使用group-by&reduce从CSV中查找以下内容(按聘用者一行):

      Department, Designation, costToCompany, State
      Sales, Trainee, 12000, UP
      Sales, Lead, 32000, AP
      Sales, Lead, 32000, LA
      Sales, Lead, 32000, TN
      Sales, Lead, 32000, AP
      Sales, Lead, 32000, TN 
      Sales, Lead, 32000, LA
      Sales, Lead, 32000, LA
      Marketing, Associate, 18000, TN
      Marketing, Associate, 18000, TN
      HR, Manager, 58000, TN

我想通过按 部门,指定,州* 和其他列(以 sum(costToCompany)TotalEmployeeCount
来简化CSV 格式)
*

应该得到如下结果:

      Dept, Desg, state, empCount, totalCost
      Sales,Lead,AP,2,64000
      Sales,Lead,LA,3,96000  
      Sales,Lead,TN,2,64000

有什么方法可以使用转换和操作来实现这一目标。还是我们应该进行RDD操作?


问题答案:

程序

  • 创建一个类(模式)以封装您的结构(方法B不是必需的,但是如果使用Java,它将使您的代码更易于阅读)

    public class Record implements Serializable {
    

    String department;
    String designation;
    long costToCompany;
    String state;
    // constructor , getters and setters
    }

  • 加载CVS(JSON)文件

        JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified


    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });

此时,您有2种方法:

A.SparkSQL

  • 注册一个表(使用您定义的模式类)
        JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
  • 用所需的查询分组查询表
        JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
  • 在这里,您还可以使用SQL方法执行所需的任何其他查询

火花

  • 使用复合密钥映射:DepartmentDesignationState
        JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }

});

  • 使用组合键,求和costToCompany列和按键累积记录数的reduceByKey
        JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });


 类似资料:
  • 我是spark的新手,我想使用group by 我想简化关于CSV,按部门分组,指定,国家,附加列总和(成本到公司)和总员工计数 结果应该是: 有什么方法可以使用转换和操作来实现这一点。或者我们应该进行RDD操作?

  • 我必须解析一个csv文件,并将其内容转储到mysql表中。 第一输出 在第二个输出中,我需要自定义标头水平对齐。例如 对于第二个输出,它可以是我选择的任何一组标题。然后,我可以使用load data infile将这两个输出数据加载到mysql表中。正在寻找awk脚本来实现这一点。如果你还需要什么,请告诉我。德克萨斯州。

  • 问题内容: 我有一系列使用Beautiful Soup解析为单个文本文件的HTML文件。HTML文件的格式设置为使其输出始终为文本文件中的三行,因此输出将类似于: 但这很容易 换句话说,HTML文件的内容在每个文件中并不是真正的标准,但是它们始终会产生三行。 因此,我想知道如果我想从Beautiful Soup生成的文本文件然后将其解析为带有以下内容的列的CSV文件(使用上面的示例),应该从哪里开

  • 我完全是一个AWS新手,试图用AWS Textract将多页文件表解析为CSV文件。在本页中,我尝试使用AWS的示例,但是当我们处理多页文件时,中断,因为在这些情况下我们需要异步处理,正如您在这里的文档中看到的那样。正确的调用函数应该是并在运行后使用。 所以,我用这个逻辑修改了他们的例子,而不是使用函数,修改后的代码看起来像这样: 但是当我运行时,我得到以下错误: 这是因为调用的标准方法是将S3文

  • 问题内容: 我有一组CSV数据要转换为XML。代码看起来不错,但是输出不够完美。它忽略了一些列,因为它们没有值,并且产生了很长的XML数据行,而不是破坏它。 这是我的CSV数据示例: 而我的代码: 当对以上数据执行此代码时,将产生: 我本人以这种形式安排它,但是输出结果很长。产生的输出应为: 问题答案: 我同意Kennet。 我只是添加了 这在元素之间添加了新行,并允许缩进。 更新 首先,我们要介

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里