我正在读取Kafka的流,我将Kafka的值(即JSON)转换为结构。
from_json
有一个变体,它采用String
类型的模式,但我找不到示例。请告知下面代码中的错误。
错误
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) ) )
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
String brokers = "quickstart:9092";
String topics = "simple_topic_6";
SparkSession sparkSession = SparkSession
.builder().appName(EmployeeSchemaLoader.class.getName())
.master(master).getOrCreate();
String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
"addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING ) ) ) ";
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> employeeDataset = sparkSession.readStream().
format("kafka").
option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics).load();
employeeDataset.printSchema();
employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
employeeDataset = employeeDataset.withColumn("employeeRecord",
functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
employeeDataset.printSchema();
employeeDataset.createOrReplaceTempView("employeeView");
sparkSession.catalog().listTables().show();
sqlCtx.sql("select * from employeeView").show();
}
你的问题帮助我发现from_json
的变体基于String
的模式仅在Java可用,并且最近在即将到来的2.3.0中被添加到Scala的Spark API中。我一直坚信Scala的Spark API总是功能最丰富的,你的问题帮助我了解到在2.3.0的变化之前不应该如此(!)
回到你的问题,实际上可以用JSON或DDL格式定义基于字符串的模式。
手工编写JSON可能有点麻烦,因此我会采取不同的方法(考虑到我是Scala开发人员,这相当容易)。
我们首先使用Scala的Spark API定义模式。
import org.apache.spark.sql.types._
val addressesSchema = new StructType()
.add($"city".string)
.add($"state".string)
.add($"zip".string)
val schema = new StructType()
.add($"firstName".string)
.add($"lastName".string)
.add($"email".string)
.add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- email: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| | |-- zip: string (nullable = true)
这似乎符合你的模式,不是吗?
使用json
方法将模式转换为JSON编码的字符串是一件轻而易举的事。
val schemaAsJson = schema.json
schemaAsJson就是你的JSON字符串,看起来很漂亮。。。六羟甲基三聚氰胺六甲醚。。。复杂的出于显示目的,我宁愿使用prettyJson方法。
scala> println(schema.prettyJson)
{
"type" : "struct",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "lastName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "email",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "addresses",
"type" : {
"type" : "array",
"elementType" : {
"type" : "struct",
"fields" : [ {
"name" : "city",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "state",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "zip",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"containsNull" : true
},
"nullable" : true,
"metadata" : { }
} ]
}
这是JSON中的模式。
您可以使用数据类型并“验证”JSON字符串(使用Spark在封面下为来自JSON的数据类型使用的DataType.fromJson)。
import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
一切似乎都很好。介意我用样本数据集检查一下吗?
val rawJsons = Seq("""
{
"firstName" : "Jacek",
"lastName" : "Laskowski",
"email" : "jacek@japila.pl",
"addresses" : [
{
"city" : "Warsaw",
"state" : "N/A",
"zip" : "02-791"
}
]
}
""").toDF("rawjson")
val people = rawJsons
.select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
.select("json.*") // <-- flatten the struct field
.withColumn("address", explode($"addresses")) // <-- explode the array field
.drop("addresses") // <-- no longer needed
.select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName| email| city|state| zip|
+---------+---------+---------------+------+-----+------+
| Jacek|Laskowski|jacek@japila.pl|Warsaw| N/A|02-791|
+---------+---------+---------------+------+-----+------+
在Martin Fowler的书中,我读到了和模式。 作者提到,将identityMap放在UnitOfWork内部是一个好主意。但怎么做呢? 据我所知,受会话限制,但作者没有提到 每个unitOfWork实例需要多少个IdentityMap实例? 如果我们有两个并发请求呢?
我尝试使ajv使用两个JSON-Schema,一个依赖于另一个。下面是我的模式的一个示例(简化): json 错误:没有带有键或引用“http://json-schema.org/draft-04/schema#”的模式 更新:如果我从types.json中删除“$schema…”,我得到的错误是: MissingReferror:无法从id#中解析引用types.json#/definition
我希望根据模式的maximal/minimum(number)或maximumlength/minimumlength(string)来验证模式。我有一个json表单: 为什么这个例子不能使用验证?我的模型未验证为false。根据本文档,可以在数组中定义不同的类型,但是如何基于最小/最大值进行验证呢?
对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?
到目前为止我的代码是:
我想将我的字符串转换为 LocalTime 格式 我收到错误: 线程“main”java.time.format中出现异常。DateTimeParseException:无法在索引0处分析文本“3:30” 预期产出: 上午3:30