当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Java中创建复杂的StructType模式

巫马浩言
2023-03-14

如何使用Spark Java中的StructType为以下数据定义数据类型?

sam|mars|1234567|"report": {"Details": [{"subject": "science","grade": "A","remark": "good"},{"subject": "maths","grade": "E","remark": "excellent"},{"subject": "geography","grade": "E","remark": "excellent"}]}
harry|venus|987654|"report": {"Details": [{"subject": "science","grade": "O","remark": "outstanding"},{"subject": "history","grade": "A","remark": "good"}]}

字段包括:姓名、地址、ID、REPORTCARD

我有以下代码:

        JavaRDD<Row> row = javaRDD.map(new Function<String, Row>(){
            @Override
            public Row call(String line) throws Exception {
                return RowFactory.create((line.split("|")));
            }
        });
    where, 
    javaRDD is created on top of the above input data.

现在,我需要使用以下行将javaRDD转换为数据帧(数据集df):

            Dataset<Row> df = spark.createDataFrame(row, <STRUCT TYPE SCHEMA>);

我需要为此创建StructType架构。如何在Spark Java中定义它。

我创建了以下StructType模式:

            List<StructField> reportFields = new ArrayList<StructField>();
            reportFields.add(DataTypes.createStructField("subject", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));

            List<StructField> schemaFields = new ArrayList<StructField>();
            schemaFields.add(DataTypes.createStructField("NAME", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ADDRESS", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ID", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("REPORTCARD", DataTypes.createStructType(reportFields), true));
            StructType schema = DataTypes.createStructType(schemaFields);

            Dataset<Row> df = spark.createDataFrame(row, schema);

但我得到了以下例外:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of struct<subject:string,grade:string,remark:string>
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, NAME), StringType), true, false) AS NAME#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, ADDRESS), StringType), true, false) AS ADDRESS#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, ID), StringType), true, false) AS ID#2
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(subject, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 0, subject), StringType), true, false), grade, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 1, grade), StringType), true, false), remark, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 2, remark), StringType), true, false)) AS REPORTCARD#3

共有2个答案

施誉
2023-03-14

你可以试试这个,

Dataset<Row> temp=row.withColumn("Details",struct("subject","grade","remark"))
                     .agg(collect_list("Details").as("Details")); 

这将适用于“详细信息”部分!

你也可以在“报告”中使用同样的方法。

孟花蜂
2023-03-14

这个StructType应该足够了

   StructType details = new StructType(new StructField[]{
    new StructField("subject", DataTypes.StringType, false, Metadata.empty()),
    new StructField("grade", DataTypes.StringType, false, Metadata.empty()),
    new StructField("remark", DataTypes.StringType, false, Metadata.empty())
   });

   StructType recordType = new StructType();
   recordType = recordType.add("details", details, false);

   StructType structType = new StructType();
   structType = structType.add("name", DataTypes.StringType, false);
   structType = structType.add("planet", DataTypes.StringType, false);
   structType = structType.add("number", DataTypes.StringType, false);
   structType = structType.add("record", recordType, false);
 类似资料:
  • 我想创建一个自定义的nifi处理器,这样我就可以读取s7 plc数据。为此,我想将这个项目的java代码:https://github.com/s7connector/s7connector转换为一个nifi处理器。 因此,我已经下载了mvn包类型,就像webiste告诉的那样:https://medium.com/hashmapinc/creating-custom-processors-and

  • 8.4 关于Director的进一步讨论        指挥者类Director在建造者模式中扮演非常重要的作用,简单的Director类用于指导具体建造者如何构建产品,它按一定次序调用Builder的buildPartX()方法,控制调用的先后次序,并向客户端返回一个完整的产品对象。下面我们讨论几种Director的高级应用方式:        1.省略Director 在有些情况下,为了简化系

  • 8.3 完整解决方案 Sunny公司开发人员决定使用建造者模式来实现游戏角色的创建,其基本结构如图8-3所示: 图8-3 游戏角色创建结构图 在图8-3中,ActorController充当指挥者,ActorBuilder充当抽象建造者,HeroBuilder、AngelBuilder和DevilBuilder充当具体建造者,Actor充当复杂产品。完整代码如下所示: //Actor角色类:复杂产

  • 没有人买车会只买一个轮胎或者方向盘,大家买的都是一辆包含轮胎、方向盘和发动机等多个部件的完整汽车。如何将这些部件组装成一辆完整的汽车并返回给用户,这是建造者模式需要解决的问题。建造者模式又称为生成器模式,它是一种较为复杂、使用频率也相对较低的创建型模式。建造者模式为客户端返回的不是一个简单的产品,而是一个由多个部件组成的复杂产品。 8.1 游戏角色设计 Sunny软件公司游戏开发小组决定开发一款名

  • 我有一个类似这样的JSON: 我正在尝试将此结构映射到 Spark 架构。我已经创建了以下内容;但是它不起作用。我还尝试在值字段映射中移除。 另外,请注意,它们“key1”和“key2”是动态字段,将使用唯一标识符生成。也可以有两个以上的键。有没有人能够将数组类型映射到结构类型?

  • 严格来说这不是使用 uniapp 的问题,只是刚好在用 uniapp 开发项目遇到的问题。 项目需要兼容Android、h5、微信小程序,现在有个后台管理的功能,需要做到移动端,在web端的时候,因为页面比较大,并且也比较好利用弹框这种交互,所以创建基本在一个页面就可以完成了(至少不用跳转页面),但是移动端,见面比较小,当一个表单比较复杂的时候,通常被设计为多个页面填写表单,才能完成整个业务的创建