当前位置: 首页 > 知识库问答 >
问题:

使用Kafka解析嵌套json的模式

乐正乐湛
2023-03-14

我收到了来自Kafka的JSON字符串,需要由PySpark处理。字符串如下所示:

{"_id": {"$oid": "5eb56a371af2d82e242d24ae"},"Id": 7,"Timestamp": {"$date": 1582889068586},"TTNR": "R902170286","SNR": 92177446,"State": 0,"I_A1": "FALSE","I_B1": "FALSE","I1": 0.0037385,"Mabs": -20.3711051,"p_HD1": 30.9632005,"pG": 27.788934,"pT": 1.7267373,"pXZ": 3.4487671,"T3": 25.2357555,"nan": 202.1999969,"Q1": 0,"a_01X": [62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925]}

我的计划是将字符串分成JSON字段。为此,我定义了以下模式:

json_schema=StructType([StructField("_id",StructField("$oid",StringType())), \
    StructField("Id", DoubleType()), \
    StructType(StructField("Timestamp", StructField("$date",LongType())), \
    StructField("TTNR", StringType()), \
    StructField("SNR", LongType()), \
    StructField("State", LongType()), \
    StructField("I_A1", StringType()), \
    StructField("I_B1", StringType()), \
    StructField("I1", DoubleType()), \
    StructField("Mabs", DoubleType()), \
    StructField("p_HD1", DoubleType()), \
    StructField("pG", DoubleType()), \
    StructField("pT", DoubleType()), \
    StructField("pXZ", DoubleType()), \
    StructField("T3", DoubleType()), \
    StructField("nan", DoubleType()), \
    StructField("Q1", LongType()), \
    StructField("a_01X", ArrayType(DoubleType()))
    ])

但是,使用此架构会导致以下错误:

pyspark.sql.utils.ParseException: u'\nmismatched input \'{\' expecting {\'SELECT\', \'FROM\', \'ADD\', \'AS\', \'ALL\', \'ANY\', \'DISTINCT\', \'WHERE\', \'GROUP\', \'BY\', \'GROUPING\', \'SETS\', \'CUBE\', \'ROLLUP\', \'ORDER\', \'HAVING\', \'LIMIT\', \'AT\', \'OR\', \'AND\', \'IN\', NOT, \'NO\', \'EXISTS\', \'BETWEEN\', \'LIKE\', RLIKE, \'IS\', \'NULL\', \'TRUE\', \'FALSE\', \'NULLS\', \'ASC\', \'DESC\', \'FOR\', \'INTERVAL\', \'CASE\', \'WHEN\', \'THEN\', \'ELSE\', \'END\', \'JOIN\', \'CROSS\', \'OUTER\', \'INNER\', \'LEFT\', \'SEMI\', \'RIGHT\', \'FULL\', \'NATURAL\', \'ON\', \'PIVOT\', \'LATERAL\', \'WINDOW\', \'OVER\', \'PARTITION\', \'RANGE\', \'ROWS\', \'UNBOUNDED\', \'PRECEDING\', \'FOLLOWING\', \'CURRENT\', \'FIRST\', \'AFTER\', \'LAST\', \'ROW\', \'WITH\', \'VALUES\', \'CREATE\', \'TABLE\', \'DIRECTORY\', \'VIEW\', \'REPLACE\', \'INSERT\', \'DELETE\', \'INTO\', \'DESCRIBE\', \'EXPLAIN\', \'FORMAT\', \'LOGICAL\', \'CODEGEN\', \'COST\', \'CAST\', \'SHOW\', \'TABLES\', \'COLUMNS\', \'COLUMN\', \'USE\', \'PARTITIONS\', \'FUNCTIONS\', \'DROP\', \'UNION\', \'EXCEPT\', \'MINUS\', \'INTERSECT\', \'TO\', \'TABLESAMPLE\', \'STRATIFY\', \'ALTER\', \'RENAME\', \'ARRAY\', \'MAP\', \'STRUCT\', \'COMMENT\', \'SET\', \'RESET\', \'DATA\', \'START\', \'TRANSACTION\', \'COMMIT\', \'ROLLBACK\', \'MACRO\', \'IGNORE\', \'BOTH\', \'LEADING\', \'TRAILING\', \'IF\', \'POSITION\', \'EXTRACT\', \'DIV\', \'PERCENT\', \'BUCKET\', \'OUT\', \'OF\', \'SORT\', \'CLUSTER\', \'DISTRIBUTE\', \'OVERWRITE\', \'TRANSFORM\', \'REDUCE\', \'SERDE\', \'SERDEPROPERTIES\', \'RECORDREADER\', \'RECORDWRITER\', \'DELIMITED\', \'FIELDS\', \'TERMINATED\', \'COLLECTION\', \'ITEMS\', \'KEYS\', \'ESCAPED\', \'LINES\', \'SEPARATED\', \'FUNCTION\', \'EXTENDED\', \'REFRESH\', \'CLEAR\', \'CACHE\', \'UNCACHE\', \'LAZY\', \'FORMATTED\', \'GLOBAL\', TEMPORARY, \'OPTIONS\', \'UNSET\', \'TBLPROPERTIES\', \'DBPROPERTIES\', \'BUCKETS\', \'SKEWED\', \'STORED\', \'DIRECTORIES\', \'LOCATION\', \'EXCHANGE\', \'ARCHIVE\', \'UNARCHIVE\', \'FILEFORMAT\', \'TOUCH\', \'COMPACT\', \'CONCATENATE\', \'CHANGE\', \'CASCADE\', \'RESTRICT\', \'CLUSTERED\', \'SORTED\', \'PURGE\', \'INPUTFORMAT\', \'OUTPUTFORMAT\', DATABASE, DATABASES, \'DFS\', \'TRUNCATE\', \'ANALYZE\', \'COMPUTE\', \'LIST\', \'STATISTICS\', \'PARTITIONED\', \'EXTERNAL\', \'DEFINED\', \'REVOKE\', \'GRANT\', \'LOCK\', \'UNLOCK\', \'MSCK\', \'REPAIR\', \'RECOVER\', \'EXPORT\', \'IMPORT\', \'LOAD\', \'ROLE\', \'ROLES\', \'COMPACTIONS\', \'PRINCIPALS\', \'TRANSACTIONS\', \'INDEX\', \'INDEXES\', \'LOCKS\', \'OPTION\', \'ANTI\', \'LOCAL\', \'INPATH\', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 0)\n\n== SQL ==\n{"fields":[{"metadata":{},"name":"_id","nullable":true,"type":{"metadata":{},"name":"$oid","nullable":true,"type":"string"}},{"metadata":{},"name":"Id","nullable":true,"type":"double"},{"metadata":{},"name":"Timestamp","nullable":true,"type":"string"},{"metadata":{},"name":"TTNR","nullable":true,"type":"string"},{"metadata":{},"name":"SNR","nullable":true,"type":"long"},{"metadata":{},"name":"State","nullable":true,"type":"long"},{"metadata":{},"name":"I_A1","nullable":true,"type":"string"},{"metadata":{},"name":"I_B1","nullable":true,"type":"string"},{"metadata":{},"name":"I1","nullable":true,"type":"double"},{"metadata":{},"name":"Mabs","nullable":true,"type":"double"},{"metadata":{},"name":"p_HD1","nullable":true,"type":"double"},{"metadata":{},"name":"pG","nullable":true,"type":"double"},{"metadata":{},"name":"pT","nullable":true,"type":"double"},{"metadata":{},"name":"pXZ","nullable":true,"type":"double"},{"metadata":{},"name":"T3","nullable":true,"type":"double"},{"metadata":{},"name":"nan","nullable":true,"type":"double"},{"metadata":{},"name":"Q1","nullable":true,"type":"long"},{"metadata":{},"name":"a_01X","nullable":true,"type":{"containsNull":true,"elementType":"double","type":"array"}}],"type":"struct"}\n^^^\n'

但是,如果我使用没有嵌套字段的模式(如下所示),我可以解析:

after_schema=StructType([ StructField("_id", StringType()), \
    StructField("Id", DoubleType()), \
    StructField("Timestamp", StringType()),
    StructField("TTNR", StringType()), \
    StructField("SNR", LongType()), \
    StructField("State", LongType()), \
    StructField("I_A1", StringType()), \
    StructField("I_B1", StringType()), \
    StructField("I1", DoubleType()), \
    StructField("Mabs", DoubleType()), \
    StructField("p_HD1", DoubleType()), \
    StructField("pG", DoubleType()), \
    StructField("pT", DoubleType()), \
    StructField("pXZ", DoubleType()), \
    StructField("T3", DoubleType()), \
    StructField("nan", DoubleType()), \
    StructField("Q1", LongType()), \
    StructField("a_01X", ArrayType(DoubleType()))
    ])

我的目标是得到这样的输出:

+---+---+---------+----+---+-----+----+----+---+----+-----+---+---+---+---+---+---+-----+
|_id| Id|Timestamp|TTNR|SNR|State|I_A1|I_B1| I1|Mabs|p_HD1| pG| pT|pXZ| T3|nan| Q1|a_01X|
+---+---+---------+----+---+-----+----+----+---+----+-----+---+---+---+---+---+---+-----+
+---+---+---------+----+---+-----+----+----+---+----+-----+---+---+---+---+---+---+-----+

我想在这方面得到一些帮助。现在我可以得到除嵌套结构之外的所有字段。

我使用的模式如下:

jsonDF = data_stream_value.select(from_json(data_stream_value.value,json_schema).alias("json_detail"))
jsonDF = jsonDF.select("json_detail.*")

Adam提到的模式适用于这个特定的字符串。但对于其他字符串(见下文),它会给出空值:

{"_id":{"$oid":"5eba5d4e5ae37509e418189d"},"Id":10437922,"Timestamp":{"$date":1589271853129},"TTNR":"R902154429","SNR":92294039,"State":1,"I_A1":"TRUE","I_B1":"FALSE","I1":1.587878,"Mabs":179.9749963,"p_HD1":198.7518137,"pG":25.0093936,"pT":0.0548027,"pXZ":12.6616249,"T3":58.1928239,"nan":1807.8800049,"Q1":94.4408417,"a_01X":[-7.541928,-8.724496,-8.2814293,-6.9679567,-5.6328863,-4.7506769,-4.0492855,-3.2090786,-1.9416894,0.8350586,3.5160924,6.4952873,8.5267264,8.2908783,6.4315302,3.3940716,-1.765957,-7.0734024,-11.7916514,-15.4409714,-17.8978349,-18.0760159,-16.6917856,-15.2785806,-11.8300438,-8.434842,-4.7363936,-1.258882,1.1050926,0.9930231,-1.8168434,-6.5076244,-12.560724,-16.9826325,-18.5447612,-17.4199231,-13.5246468,-9.013271,-4.0552186,0.2191476,2.9476146,3.0667159,1.7205641,-1.7033613,-4.3821348,-6.5505059,-7.8762842,-8.0560662,-6.2388775,-3.2530274]}

我得到了以下输出:

+----+----+---------+----+----+-----+----+----+----+----+-----+----+----+----+----+----+----+-----+
| _id|  Id|Timestamp|TTNR| SNR|State|I_A1|I_B1|  I1|Mabs|p_HD1|  pG|  pT| pXZ|  T3| nan|  Q1|a_01X|
+----+----+---------+----+----+-----+----+----+----+----+-----+----+----+----+----+----+----+-----+
|null|null|     null|null|null| null|null|null|null|null| null|null|null|null|null|null|null| null|
+----+----+---------+----+----+-----+----+----+----+----+-----+----+----+----+----+----+----+-----+

共有1个答案

葛成双
2023-03-14

您忘记将“时间戳”指定为结构类型。

这个片段对我有用。

import pyspark.sql.functions as fcn
import pandas as pd
from pyspark.sql.types import *
import json

schema = StructType(
    [
        StructField("_id",StringType(),True),
        StructField("Id",LongType(),True),
        StructField("Timestamp",StructType(
            [
                StructField("$date",LongType(),True)
            ]
        ), True),
        StructField("TTNR",StringType(),True),
        StructField("SNR",LongType(),True),
        StructField("State",LongType(),True),
        StructField("I_A1",StringType(),True),
        StructField("I_B1",StringType(),True),
        StructField("I1",DoubleType(),True),
        StructField("Mabs",DoubleType(),True),
        StructField("p_HD1",DoubleType(),True),
        StructField("pG",DoubleType(),True),
        StructField("pT",DoubleType(),True),
        StructField("pXZ",DoubleType(),True),
        StructField("T3",DoubleType(),True),
        StructField("nan",DoubleType(),True),
        StructField("Q1",LongType(),True),
        StructField("a_01X",ArrayType(DoubleType()),True)
      ]
)

example = {"_id": '{"$oid": "5eb56a371af2d82e242d24ae"}',"Id": 7,"Timestamp": {"$date": 1582889068586},"TTNR": "R902170286","SNR": 92177446,"State": 0,"I_A1": "FALSE","I_B1": "FALSE","I1": 0.0037385,"Mabs": -20.3711051,"p_HD1": 30.9632005,"pG": 27.788934,"pT": 1.7267373,"pXZ": 3.4487671,"T3": 25.2357555,"nan": 202.1999969,"Q1": 0,"a_01X": [62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925,62.7839925]}

data_stream_value = spark.createDataFrame(
    pd.DataFrame(
    {
      'foo': 'bar',
      'value': json.dumps(example)
    }, index=[0]
  )
)
jsonDF = data_stream_value.select(fcn.from_json(data_stream_value.value, schema).alias("json_detail"))
jsonDF = jsonDF.select("json_detail.*")
display(jsonDF)
 类似资料:
  • 问题内容: 我正在尝试从上述JSON检索邮政编码。我正在用gson解析它。我是JSON的新手,从我在这里的所有文章中读到的内容(有些与此类似),我都知道字段名称应保持原样。所以我知道我必须做出4类,即响应,视图,结果和地址。我使它们成为静态嵌套类,但是我只得到空值作为输出。在下一个JSON中,我有多个地址。但是我只停留在这个单一的回应上。 举一个简短的例子,我尝试使用此代码检索Timestamp,

  • 我想做的是使用Gson将嵌套的json数据解析为Java对象,并使用自定义的toString()将其打印出来。 Json内容 POJO类:示例 POJO类:JsonFormatter 我的POJO还有其他类,我试图将其解析为Gson的方式是: 但是当我试图打印出它的子值时,比如 我得到了错误: 我想实现的是将上面的json内容打印成这样: 谁能帮我解决这个问题吗?提前谢谢!

  • 问题内容: 我正在尝试使用具有以下结构的Java中的gson解析一些JSON数据,但是通过在线查看示例,我找不到任何能完成此工作的东西。 有人可以协助吗? 问题答案: 您只需要创建一个Java类结构即可表示JSON中的数据。为了做到这一点,我建议您将JSON复制到此在线JSON Viewer中 ,您会发现JSON的结构更加清晰… 基本上,您需要这些类(伪代码): 请注意,您的类中的属性名称必须与J

  • 问题内容: 此JSON输出来自MongoDB聚合查询。我本质上需要将嵌套数据JSON解析为以下’ 和值。 我尝试了5种不同的技术来从中获得所需的信息,但是使用和模块却遇到了问题。 理想情况下,输出将是这样的: 问题答案: 注意:来自MongoDB的JSON响应实际上无效。JSON需要双引号(),而不是单引号()。 我不确定为什么您的响应中有单引号而不是双引号,但是从其外观上,您可以替换它们,然后只

  • 问题内容: 我正在尝试解析这种结构:(它使我发疯,并且我尝试了我能想到的一切。但是我不是很有经验) “ topDrop”就像文件名吗?player是一个JSONArray,包含5个播放器JSONObject。但是在JSON术语中,最重要的是什么。我在JSON验证程序上签出有效的凭证,我需要这样的凭证: topDrop作为JSONObject Player,作为JSONArray,并循环遍历数组中的

  • 问题内容: 我正在将我的第一个应用程序构建在骨干网中,我想知道哪种方法是解析具有多个级别的json的最佳模式。这是一个简单的json小示例: 要打印它,我正在使用collection并在主干中查看,如下所示:COLLECTION: 这就是称为视图的两个视图,因为我想要的每个酒店都有不同的视图: 我的模板是: 但是不打印名称,我也尝试过: 但是我无法打印值名称,该怎么做?谢谢 问题答案: 首先,JS