当前位置: 首页 > 知识库问答 >
问题:

如何使用PYSPARK处理关系数据库中的JSON字段?

充阳秋
2023-03-14

我正在尝试处理PostgreSQL数据库中的JSON列。我可以使用以下方式连接到数据库:

import os
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
findspark.init(os.environ['SPARK_HOME'])

# DB credentials
user = os.environ['EVENTS_DEV_UID']
password = os.environ['EVENTS_DEV_PWD']
host = os.environ['EVENTS_DEV_HOST']
port = os.environ['EVENTS_DEV_PORT']
db = os.environ['EVENTS_DEV_DBNAME']

# Initiate spark session
sc = SparkContext()
spark = SQLContext(sc)

# Set properties
properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}

# Load data
df = spark.read.jdbc(
    url = 'jdbc:postgresql://' + host + ':' + port + '/' + db,
    table = 'events',
    properties = properties)

问题始于铸造JSON字段。Spark不能识别params的结构格式。当我打印模式时:

df.printSchema()

root
|--time: time(nullable=true)
|--name: string(nullable=true)
|--params: string(nullable=true)

当我尝试将字符串强制转换为struct时:

df = df.withColumn('params', df.params.cast('struct'))

我遇到以下错误:

ParseException: '\nDataType struct is not supported.(line 1, pos 0)\n\n== SQL ==\nstruct\n^^^\n'

我想问题在于转义字符。有人知道如何进行吗?

共有1个答案

公良信然
2023-03-14

结构不是有效的转换类型。您可以使用python的json.loads函数定义自己的UDF。让我们从一个示例数据帧开始:

df = sc.parallelize([[1, "a", "{\"a\":1, \"b\":2}"], [2, "b", "{\"a\":3, \"b\":4}"]])\
    .toDF(["col1", "col2", "json_col"])
df.show()

    +----+----+--------------+
    |col1|col2|      json_col|
    +----+----+--------------+
    |   1|   a|{"a":1, "b":2}|
    |   2|   b|{"a":3, "b":4}|
    +----+----+--------------+

然后,输出StructType将具有模式:

from pyspark.sql.types import IntegerType, StructField, StructType
schema = StructType([StructField("a", IntegerType()), StructField("b", IntegerType())])

不能将StringType强制转换为StructType,因此UDF

import pyspark.sql.functions as psf
import json
json_load = psf.udf(json.loads, schema)

现在我们可以处理json_col

df_parsed = df.withColumn("parsed_json", json_load("json_col"))
df_parsed.show()
df_parsed.printSchema()

    +----+----+--------------+-----------+
    |col1|col2|      json_col|parsed_json|
    +----+----+--------------+-----------+
    |   1|   a|{"a":1, "b":2}|      [1,2]|
    |   2|   b|{"a":3, "b":4}|      [3,4]|
    +----+----+--------------+-----------+

    root
     |-- col1: long (nullable = true)
     |-- col2: string (nullable = true)
     |-- json_col: string (nullable = true)
     |-- parsed_json: struct (nullable = true)
     |    |-- a: integer (nullable = true)
     |    |-- b: integer (nullable = true)

您还可以尝试在加载数据帧时直接传递模式。

 类似资料:
  • 问题内容: 让我们看一个例子-书。一本书可以有1..n位作者。作者可以拥有1..m本书。代表一本书的所有作者的一种好方法是什么? 我想到了一个创建Books表和Authors表的想法。Authors表具有一个主要AuthorID密钥,即作者的姓名。图书表具有主要的图书ID和有关图书的元数据(书名,出版日期等)。但是,需要一种将书籍链接到作者以及将作者链接到书籍的方法。这就是问题所在。 假设我们有三

  • 问题内容: 我有很多关于在Android应用程序中处理异步数据库的问题。 由于我知道数据库是异步的,因此我尝试了几种方法来处理它。如您在代码中所见,我有两个函数需要在数据库中使用数组。我的第一个函数()将对数据库中的数组应用更改,而我的第二个函数()需要将此数组与从我的第一个函数中应用的更改一起使用。这是我的代码: 这是setArray_for_database的代码: 这是我的把戏。具有另一个内

  • 下面是我得到的API请求的JSON响应。 user={'name':'Siva','address':'my address','pincode':12345,'url':'http://myweb.com/index.php?title=firstname:lastname中间名 由于此JSON响应以user=开头,因此它既不是JSONObject也不是JSONArray。所以我认为这是字符串,

  • 事务的概念 事务的概念来自于两个独立的需求:并发数据库访问,系统错误恢复。 一个事务是可以被看作一个单元的一系列SQL语句的集合。 事务的特性(ACID) A, atomacity 原子性 事务必须是原子工作单元;对于其数据修改,要么全都执行,要么全都不执行。通常,与某个事务关联的操作具有共同的目标,并且是相互依赖的。如果系统只执行这些操作的一个子集,则可能会破坏事务的总体目标。原子性消除了系统处

  • 问题内容: 在数据库中建立适当的关系对数据完整性以外的其他功能没有帮助吗? 它们会改善还是阻碍性能? 问题答案: 我不得不说,适当的关系将比省略它们更好地帮助人们理解数据(或数据的意图),特别是因为维护它们的总成本非常低。 它们的存在不会影响性能,除非是在体系结构方面(正如其他人指出的那样,数据完整性有时会导致外键冲突,这可能会产生某些影响),但是IMHO的许多好处(如果正确使用,则不胜枚举)。

  • 我在Jena SDB上研究SPARQL查询性能。对于持久性,Jena SDB将三元组存储在关系数据库(例如,MySQL)中。Jena SDB如何处理SPARQL查询?是否: null