当前位置: 首页 > 知识库问答 >
问题:

使用Apache Spark和Java将CSV解析为数据帧/数据集

史淳
2023-03-14

我是spark的新手,我想使用group by

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想简化关于CSV,按部门分组,指定,国家,附加列总和(成本到公司)和总员工计数

结果应该是:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有什么方法可以使用转换和操作来实现这一点。或者我们应该进行RDD操作?

共有3个答案

邹学民
2023-03-14

下面的内容可能并不完全正确,但它应该让您了解如何处理数据。它并不漂亮,应该用case类等替换,但作为如何使用spark api的一个快速示例,我希望它足够了:)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

或者可以使用SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""
羊舌承天
2023-03-14

CSV文件可以通过Spark内置的CSV阅读器进行解析。成功读取文件后,它将返回DataFrame/DataSet。在DataFrame/DataSet之上,您可以轻松地应用类似SQL的操作。

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();
import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");
Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

从CSV文件读取数据的更多选项

在spark sql元存储中注册一个表以执行sql操作

df.createOrReplaceTempView("employee");

在已注册的数据帧上运行SQL查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

我们甚至可以直接在CSV文件上执行SQL,而不用使用Spark SQL创建表

为sql函数执行必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

在dataframe/dataset上使用group byagg对数据执行计数

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing
"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
岳昊空
2023-03-14

>

public class Record implements Serializable {
  String department;
  String designation;
  long costToCompany;
  String state;
  // constructor , getters and setters  
}

加载CVS(JSON)文件

JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path/input.csv");
//JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified


JavaRDD<Record> rdd_records = sc.textFile(data).map(
  new Function<String, Record>() {
      public Record call(String line) throws Exception {
         // Here you can use JSON
         // Gson gson = new Gson();
         // gson.fromJson(line, Record.class);
         String[] fields = line.split(",");
         Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
         return sd;
      }
});

此时,您有两种方法:

>

  • 注册一个表(使用您定义的模式类)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    

    通过所需的查询分组查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    

    在这里,您还可以使用SQL方法执行任何其他需要的查询

    >

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

    减少使用复合键的ByKey,求和成本ToCompany列,并按键累加记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    

  •  类似资料:
    • 问题内容: 我有一个我希望导出到CSV文件的文件。但是,pandas似乎将某些值而不是类型写为。我找不到如何更改此行为。 建立数据框: 查看: 导出它: 为什么十位数有一个零点? 当然,我可以将此函数粘贴到管道中以重新转换整个CSV文件,但似乎没有必要: 问题答案: 我正在寻找的答案与@Jeff在答案中提出的内容略有不同。功劳归于他。最终解决了我的问题,以供参考:

    • 问题内容: 我有以上述格式给出的.Data文件。我正在用Java编写程序,该程序将从.data文件中获取值并将其放入缓冲区中。我的Java程序通过JDBC连接到Mysql(windows)。所以我需要从上述格式的文件中读取值,并将其放入缓冲区 这样,我将存储这些值,并且jdbc将填充Mysql(windows)上的数据库表。请告诉我最好的方法。 问题答案: 查看此问题的答案,以读取文件行并将其拆分

    • 我想将包含字符串记录的RDD转换为Spark数据帧,如下所示。 模式行不在同一个中,而是在另一个变量中: 所以现在我的问题是,我如何使用上面两个,在Spark中创建一个数据帧?我使用的是Spark 2.2版。 我确实搜索并看到了一篇帖子:我可以使用spack-csv将表示为字符串的CSV读取到Apache Spark中吗?然而,这并不是我所需要的,我也无法找到一种方法来修改这段代码以在我的情况下工

    • Python是如何将CSV文件读入pandas数据帧的(我可以使用它进行统计操作,可以有不同类型的列,等等)? 我的CSV文件有以下内容: 在R中,我们将使用以下方法读取此文件: 这将返回一个R数据。框架: 有没有类似python的方法来获得相同的功能?

    • 我正在尝试根据条件从数据帧创建csv,例如如果特定列不为null,则需要将其添加到csv文件中。我的代码确实会根据条件转换文件,但最终会添加一个额外的空行。检查屏幕截图 这是我的代码: 如何从csv文件中删除最后一个空行。

    • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus