当前位置: 首页 > 知识库问答 >
问题:

将字符串类型的列转换为结构,并使用PySpark解压该列

姚星宇
2023-03-14

输入自由度:

+------+-----------------------------------------------------+
|rowNum|infoCol                                              |
+------+-----------------------------------------------------+
|100   |[('john', 'customer'), ('abc, mno, xyz', 'purchase')]|
|200   |[('doe', 'customer')]                                |
+------+-----------------------------------------------------+
root
 |-- rowNum: string (nullable = false)
 |-- infoCol: string (nullable = false)

(预期)输出DF:

+------+--------+-----------------+
|rowNum|customer|         purchase|
+------+--------+-----------------+
|   100|['john']|['abc, mno, xyz']|
|   100| ['doe']|             null|
+------+--------+-----------------+

我已经尝试使用< code>split函数,但它并不完全符合我的需求。

inputdf = spark.createDataFrame(
    [
        ("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
        ("200", "[('doe', 'customer')]"),
    ],
    ['rowNum', 'infoCol'] 
)

from pyspark.sql.functions import col, regexp_replace, split
outputdf = inputdf.withColumn("newcol", split(col("infoCol"), ","))

共有2个答案

焦阎宝
2023-03-14

这是我的尝试,这可以用于许多列,不仅用于客户购买,而且如果列名称在最后。

import pyspark.sql.functions as f

df = inputdf \
  .withColumn('infoCol', f.regexp_replace('infoCol', '[\[\]]', '')) \
  .withColumn('infoCol', f.regexp_replace('infoCol', '(\),)', ') ,')) \
  .withColumn('infoCol', f.explode(f.split('infoCol', ' , '))) \
  .withColumn('infoCol', f.regexp_replace('infoCol', '[\(\)]', '')) \
  .withColumn('infoCol', f.regexp_replace('infoCol', '(\',)', '\' ,')) \
  .withColumn('cols', f.split('infoCol', ' , ')[1]) \
  .withColumn('cols', f.regexp_replace('cols', '\'', '')) \
  .withColumn('infoCol', f.split('infoCol', ' , ')[0]) \
  .withColumn('infoCol', f.concat(f.lit('['), f.col('infoCol'), f.lit(']'))) \

values = df.select('cols').distinct().rdd.map(lambda x: x.cols).collect()

df.groupBy('rowNum') \
  .pivot('cols', values) \
  .agg(f.first('infoCol')) \
  .show(10, False)

+------+--------+-----------------+
|rowNum|customer|purchase         |
+------+--------+-----------------+
|200   |['doe'] |null             |
|100   |['john']|['abc, mno, xyz']|
+------+--------+-----------------+
李良策
2023-03-14

这是我对火花内置函数的尝试。

这里的想法是首先创建2列与客户,购买作为值和其他值在另一列,以获得这些列我使用拆分然后爆炸。

一旦我们得到客户,购买值然后分组通过PIVot来透视数据,最终拆分列以获得数组。

示例:

inputdf = spark.createDataFrame(
    [
        ("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
        ("200", "[('doe', 'customer')]"),
    ],
    ['rowNum', 'infoCol'] 
)

from pyspark.sql.functions import *

inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",split(regexp_replace(col("col"),"[\[|\]|\(|\)]",""),"',")).\
withColumn("new1",regexp_replace(trim(element_at(col("newCol1"),1)),"[']","")).\
withColumn("new2",regexp_replace(trim(element_at(col("newCol1"),2)),"[']","")).\
groupby("rowNum").\
pivot("new2").\
agg(first(col("new1"))).\
withColumn("customer",split(col("customer"),",")).\
withColumn("purchase",split(col("purchase"),",")).\
show()

#+------+--------+-----------------+
#|rowNum|customer|         purchase|
#+------+--------+-----------------+
#|   200|   [doe]|             null|
#|   100|  [john]|[abc,  mno,  xyz]|
#+------+--------+-----------------+

更新:

inputdf = spark.createDataFrame(
    [
        ("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
        ("200", "[('doe', 'customer')]"),
    ],
    ['rowNum', 'infoCol'] 
)

from pyspark.sql.functions import *

inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",split(regexp_replace(col("col"),"[\[|\]|\(|\)]",""),"',")).\
withColumn("new1",regexp_replace(trim(element_at(col("newCol1"),1)),"[']","")).\
withColumn("new2",regexp_replace(trim(element_at(col("newCol1"),2)),"[']","")).\
groupby("rowNum").\
pivot("new2").\
agg(first(col("new1"))).\
withColumn("customer",col("customer")).\
withColumn("purchase",col("purchase")).\
show()

#+------+--------+-------------+
#|rowNum|customer|     purchase|
#+------+--------+-------------+
#|   200|     doe|         null|
#|   100|    john|abc, mno, xyz|
#+------+--------+-------------+

更新2:

inputdf = spark.createDataFrame(
    [
        ("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase'), ('abc123', 'purchase')]"),
        ("200", "[('doe', 'customer')]"),
    ],
    ['rowNum', 'infoCol'] 
)

from pyspark.sql.functions import *


inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",expr("""transform(split(regexp_replace(col,"[\[|\]|\(|\)]",""),"',"),x -> regexp_replace(trim(x),"[']",""))""")).\
withColumn("new1",regexp_replace(element_at(col("newCol1"),-1),"[\]]","")).\
withColumn("new2",array_except(col("newCol1"),array(lit('purchase'),lit('customer'),lit('purchase]'),lit('customer]')))).\
withColumn("new2",expr("""transform(new2,x -> concat("'",regexp_replace(x,"[\\\\[]",""),"'"))""")).\
drop(*['col','newCol1']).\
groupby("new1","rowNum").agg(flatten(collect_list(col("new2"))).alias("new2")).\
groupby("rowNum").pivot("new1").agg(first(col("new2"))).\
show(10,False)

#+------+--------+---------------------------+
#|rowNum|customer|purchase                   |
#+------+--------+---------------------------+
#|200   |['doe'] |null                       |
#|100   |['john']|['abc, mno, xyz', 'abc123']|
#+------+--------+---------------------------+
 类似资料:
  • 我的一个数据帧(spark.sql)有这个模式。 我需要将其保存到CSV文件,但不使用任何扁平化,以以下格式分解。 我直接使用了命令 ,这符合我的目的,但我需要一个更好的方法。我正在使用派斯帕克

  • 问题内容: 给定python字符串列表,如何自动将其转换为正确的类型?意思是,如果我有: 我希望将其转换为列表 其中第一个元素是stuntng,第二个元素是int,第三个元素是float,第四个元素是int。 我怎样才能做到这一点?谢谢。 问题答案: import ast

  • 方法(下面)是一个类型,我用作参数的类是一个。我想知道我选角的方式是否管用: 这就是方法: 此URL指向类: 我把它放进了粘贴箱,因为课程太长了。

  • 问题内容: 我有pyspark数据框,其中包含名为 Filters 的列:“ array>” 我想将数据帧保存在csv文件中,为此,我需要将数组转换为字符串类型。 我尝试将其强制转换为:和,但是这两种解决方案都会为“过滤器”列中的每一行生成错误消息: 代码如下 样本JSON数据: 谢谢 !! 问题答案: 我创建了一个样本JSON数据集来匹配该模式: 使用explode()函数可以最佳化解决您的问题

  • 问题内容: 我想通过使用struct / interface的字符串名称值将特定变量转换为特定的已定义struct / interface。 例如: 和新变量 这可能是偶然的吗?也许使用反射? 干杯 问题答案: 这不可能。Go是一种静态类型的语言,这意味着必须在编译时知道变量和表达式的类型。 在类型断言中: […]如果类型断言成立,则表达式的值为存储在其中的值,其类型为。 因此,您可以使用类型断言

  • 问题内容: 我有以下数据框: 现在,我想将Vacationdate列的数据类型更改为String,以便数据框也采用这种新类型并覆盖所有条目的数据类型数据。例如写后: Vacationdate的数据类型应被覆盖。 我已经使用过诸如cast,StringType或astype之类的函数,但是我没有成功。你知道怎么做吗? 问题答案: 让我们创建一些虚拟数据: 如果Spark> = 1.5.0,则可以使用