当前位置: 首页 > 知识库问答 >
问题:

如何将from_json与模式作为字符串(即json编码的模式)一起使用?

呼延学
2023-03-14

我正在读取Kafka的流,我将Kafka的值(即JSON)转换为结构。

from_json有一个变体,它采用String类型的模式,但我找不到示例。请告知下面代码中的错误。

错误

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',

== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
-------^^^

at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

程序

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";
    String brokers = "quickstart:9092";
    String topics = "simple_topic_6";

    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();

   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> employeeDataset = sparkSession.readStream().
            format("kafka").
            option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics).load();
    employeeDataset.printSchema();
    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
    employeeDataset = employeeDataset.withColumn("employeeRecord",
            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));

    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");

    sparkSession.catalog().listTables().show();

    sqlCtx.sql("select * from employeeView").show();
}

共有1个答案

易镜
2023-03-14

你的问题帮助我发现from_json的变体基于String的模式仅在Java可用,并且最近在即将到来的2.3.0中被添加到Scala的Spark API中。我一直坚信Scala的Spark API总是功能最丰富的,你的问题帮助我了解到在2.3.0的变化之前不应该如此(!)

回到你的问题,实际上可以用JSON或DDL格式定义基于字符串的模式。

手工编写JSON可能有点麻烦,因此我会采取不同的方法(考虑到我是Scala开发人员,这相当容易)。

我们首先使用Scala的Spark API定义模式。

import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

这似乎符合你的模式,不是吗?

使用json方法将模式转换为JSON编码的字符串是一件轻而易举的事。

val schemaAsJson = schema.json

schemaAsJson就是你的JSON字符串,看起来很漂亮。。。六羟甲基三聚氰胺六甲醚。。。复杂的出于显示目的,我宁愿使用prettyJson方法。

scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

这是JSON中的模式。

您可以使用数据类型并“验证”JSON字符串(使用Spark在封面下为来自JSON的数据类型使用的DataType.fromJson)。

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

一切似乎都很好。介意我用样本数据集检查一下吗?

val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "jacek@japila.pl",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+
 类似资料:
  • 在Martin Fowler的书中,我读到了和模式。 作者提到,将identityMap放在UnitOfWork内部是一个好主意。但怎么做呢? 据我所知,受会话限制,但作者没有提到 每个unitOfWork实例需要多少个IdentityMap实例? 如果我们有两个并发请求呢?

  • 我尝试使ajv使用两个JSON-Schema,一个依赖于另一个。下面是我的模式的一个示例(简化): json 错误:没有带有键或引用“http://json-schema.org/draft-04/schema#”的模式 更新:如果我从types.json中删除“$schema…”,我得到的错误是: MissingReferror:无法从id#中解析引用types.json#/definition

  • 我希望根据模式的maximal/minimum(number)或maximumlength/minimumlength(string)来验证模式。我有一个json表单: 为什么这个例子不能使用验证?我的模型未验证为false。根据本文档,可以在数组中定义不同的类型,但是如何基于最小/最大值进行验证呢?

  • 对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?

  • 我想将我的字符串转换为 LocalTime 格式 我收到错误: 线程“main”java.time.format中出现异常。DateTimeParseException:无法在索引0处分析文本“3:30” 预期产出: 上午3:30