当前位置: 首页 > 知识库问答 >
问题:

SparkSQLJava:使用数据集时将Pojo转换为表格格式

韦锦程
2023-03-14

我对Spark SQL很陌生。在执行一项培训任务时,我遇到了以下问题,无法找到答案(以下所有示例都有点愚蠢,但出于演示目的,应该仍然可以)。

我的应用程序读取拼花文件并根据其内容创建数据集:

DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));

数据集。show()调用结果:

+------------+----------------+--------+
+    Names   +       Gender   +   Age  +
+------------+----------------+--------+
| Jack, Jill |  Male, Female  | 30, 25 |

然后,我将数据集转换为一个新的数据集,其中包含Person类型:

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData
            .flatMap((Row sourceRow) -> {
                // code to parse an input row and split person data goes here
                Person person1 = new Person(name1, gender1, age1);
                Person person2 = new Person(name2, gender2, age2);
                return Arrays.asList(person1, person2);
            }, Encoders.bean(Person.class));
}

哪里

public abstract class Human implements Serializable {
   protected String name;
   protected String gender;
   // getters/setters go here
   // default constructor + constructor with the name and gender params
 }
 public class Person extends Human {
   private String age;
   // getters/setters for the age param go here
   // default constructor + constructor with the age, name and gender params
   // overriden toString() method which returns the string: (<name>, <gender>, <age>)
 }

最后,当我显示数据集的内容时,我希望看到

 +------------+----------------+--------+
 +    name    +       gender   +   age  +
 +------------+----------------+--------+
 |     Jack   |     Male       |   30   |
 |     Jill   |     Femail     |   25   |

然而,我明白了

+-------------------+----------------+--------+
+      name         +       gender   +   age  +
+-------------------+----------------+--------+
|(Jack, Male, 30)   |                |        |
|(Jill, Femail, 25) |                |        |

这是toString()方法的结果,而标头是正确的。我相信编码器有问题,如果我使用它显示的Encoders.java序列化(T)或Encoders.kryo(T)

+------------------+
+        value     +
+------------------+
|(Jack, Male, 30)  |
|(Jill, Femail, 25)|

我最担心的是,编码器的错误使用可能会导致错误的SerDe和/或性能损失。在我能找到的所有Spark Java示例中,我看不到任何特殊之处。。。

你能告诉我我做错了什么吗?

更新1

以下是我的项目的依赖项:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

解决方案

正如abaghel建议的那样,我将版本升级到2.0.2(请注意,在2.0.0版本上存在适用于Windows的错误),在我的代码中到处使用Dataset而不是DataFrames(似乎DataFrames不是Apache Spark的一部分从2.0.0开始),并使用基于迭代器的平面图函数从行转换为人。

仅与大家分享,在1.6.2版中使用基于TraversableOnce的flatMap的方法对我来说并不适用,因为它引发了“MyPersonversion$function1 not Serializable”异常

现在一切正常。

共有1个答案

何志业
2023-03-14

您使用的Spark版本是什么?您提供的平面图方法未使用版本2.2.0编译。所需的返回类型是Iterator

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData.flatMap(row -> {
        String[] nameArr = row.getString(0).split(",");
        String[] genArr = row.getString(1).split(",");
        String[] ageArr = row.getString(2).split(",");
        Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
        Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
        return Arrays.asList(person1, person2).iterator();
    }, Encoders.bean(Person.class));
}

//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();

 类似资料:
  • null 这是很好的,因为我可以确定我的API请求是否成功。 但是: 由于JSend格式有它自己的东西,它在响应时也有一个小的状态指示器,如下所示: 我是否应该坚持手动解析并使用而不是我的模型 对于类型paramter? 因为这样,我可以使用并将字符串转换为JSON,然后我可以手动解析模型,就像为它们编写解析器一样。

  • 您能告诉我如何使用java中的ApachePOI将html表数据呈现到Excel单元格中吗。我有以下要求,比如。。 但它只是作为文本字符串...在这里输入图像描述

  • 我需要将长数据格式(long)转换为宽格式(wide),条件如下(如果可能): 1) 所有数据文件都将是具有相同结构(id、名称、值)的长格式(long),但每个数据文件将具有不同的变量、值和变量数: 2) 每个数据文件将是不同的变量混合物(因子、整数、数字)。有些因素可能每个案例都有多个级别(从长远来看是水果和肉),我想为这些因素中的每个级别创建一个单独的虚拟变量(逻辑)。因子和数值变量的数量将

  • 我在课堂上收到了一个CSV,我需要获取用于装箱POJO的值。我不必在目录中打开“file.csv”,由Flink将逗号分隔的元素传递给EventDeserializationSchema,这个元素用于“Event Class”来处理每个事件。 以下是一个示例: 在:“‘亚当’、‘史密斯’,66,….‘12:01:00.000’”- 为此,我正在使用:https://github.com/Faste

  • pre { white-space: pre-wrap; } 本实例演示如何转换表格(table)为数据网格(datagrid)。 数据网格(datagrid)的列信息是定义在 <thead> 标记中,数据是定义在 <tbody> 标记中。确保为所有的数据列设置 field 名称,请看下面的实例:     <table id="tt">         <thead>             <

  • 问题内容: 我必须使用使用JSend格式的API 。 长话短说,它使用HTTP状态代码来指示状态,例如: 200就是成功 406未经授权 这很好,因为我可以据此确定我的API请求是否成功。 但: 由于JSend格式是它自己的东西,因此它在响应中还带有一个状态指示器,如下所示: 因此,它具有一个“状态”字段,该字段还显示API请求是否成功。 问题: 进行了改进以解析对POJO的响应,因此假定响应仅包