输入自由度:
+------+-----------------------------------------------------+
|rowNum|infoCol |
+------+-----------------------------------------------------+
|100 |[('john', 'customer'), ('abc, mno, xyz', 'purchase')]|
|200 |[('doe', 'customer')] |
+------+-----------------------------------------------------+
root
|-- rowNum: string (nullable = false)
|-- infoCol: string (nullable = false)
(预期)输出DF:
+------+--------+-----------------+
|rowNum|customer| purchase|
+------+--------+-----------------+
| 100|['john']|['abc, mno, xyz']|
| 100| ['doe']| null|
+------+--------+-----------------+
我已经尝试使用< code>split函数,但它并不完全符合我的需求。
inputdf = spark.createDataFrame(
[
("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
("200", "[('doe', 'customer')]"),
],
['rowNum', 'infoCol']
)
from pyspark.sql.functions import col, regexp_replace, split
outputdf = inputdf.withColumn("newcol", split(col("infoCol"), ","))
这是我的尝试,这可以用于许多列,不仅用于客户
,购买
,而且如果列名称在最后。
import pyspark.sql.functions as f
df = inputdf \
.withColumn('infoCol', f.regexp_replace('infoCol', '[\[\]]', '')) \
.withColumn('infoCol', f.regexp_replace('infoCol', '(\),)', ') ,')) \
.withColumn('infoCol', f.explode(f.split('infoCol', ' , '))) \
.withColumn('infoCol', f.regexp_replace('infoCol', '[\(\)]', '')) \
.withColumn('infoCol', f.regexp_replace('infoCol', '(\',)', '\' ,')) \
.withColumn('cols', f.split('infoCol', ' , ')[1]) \
.withColumn('cols', f.regexp_replace('cols', '\'', '')) \
.withColumn('infoCol', f.split('infoCol', ' , ')[0]) \
.withColumn('infoCol', f.concat(f.lit('['), f.col('infoCol'), f.lit(']'))) \
values = df.select('cols').distinct().rdd.map(lambda x: x.cols).collect()
df.groupBy('rowNum') \
.pivot('cols', values) \
.agg(f.first('infoCol')) \
.show(10, False)
+------+--------+-----------------+
|rowNum|customer|purchase |
+------+--------+-----------------+
|200 |['doe'] |null |
|100 |['john']|['abc, mno, xyz']|
+------+--------+-----------------+
这是我对火花内置
函数的尝试。
这里的想法是首先创建2列
与客户,购买
作为值和其他值在另一列,以获得这些列我使用拆分然后爆炸。
一旦我们得到客户,购买
值然后分组通过PIV
ot来透视数据,最终拆分列以获得数组。
示例:
inputdf = spark.createDataFrame(
[
("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
("200", "[('doe', 'customer')]"),
],
['rowNum', 'infoCol']
)
from pyspark.sql.functions import *
inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",split(regexp_replace(col("col"),"[\[|\]|\(|\)]",""),"',")).\
withColumn("new1",regexp_replace(trim(element_at(col("newCol1"),1)),"[']","")).\
withColumn("new2",regexp_replace(trim(element_at(col("newCol1"),2)),"[']","")).\
groupby("rowNum").\
pivot("new2").\
agg(first(col("new1"))).\
withColumn("customer",split(col("customer"),",")).\
withColumn("purchase",split(col("purchase"),",")).\
show()
#+------+--------+-----------------+
#|rowNum|customer| purchase|
#+------+--------+-----------------+
#| 200| [doe]| null|
#| 100| [john]|[abc, mno, xyz]|
#+------+--------+-----------------+
更新:
inputdf = spark.createDataFrame(
[
("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase')]"),
("200", "[('doe', 'customer')]"),
],
['rowNum', 'infoCol']
)
from pyspark.sql.functions import *
inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",split(regexp_replace(col("col"),"[\[|\]|\(|\)]",""),"',")).\
withColumn("new1",regexp_replace(trim(element_at(col("newCol1"),1)),"[']","")).\
withColumn("new2",regexp_replace(trim(element_at(col("newCol1"),2)),"[']","")).\
groupby("rowNum").\
pivot("new2").\
agg(first(col("new1"))).\
withColumn("customer",col("customer")).\
withColumn("purchase",col("purchase")).\
show()
#+------+--------+-------------+
#|rowNum|customer| purchase|
#+------+--------+-------------+
#| 200| doe| null|
#| 100| john|abc, mno, xyz|
#+------+--------+-------------+
更新2:
inputdf = spark.createDataFrame(
[
("100", "[('john', 'customer'), ('abc, mno, xyz', 'purchase'), ('abc123', 'purchase')]"),
("200", "[('doe', 'customer')]"),
],
['rowNum', 'infoCol']
)
from pyspark.sql.functions import *
inputdf.withColumn("newcol", split(col("infoCol"), "\),")).\
selectExpr("explode(newcol)","rowNum").\
withColumn("newCol1",expr("""transform(split(regexp_replace(col,"[\[|\]|\(|\)]",""),"',"),x -> regexp_replace(trim(x),"[']",""))""")).\
withColumn("new1",regexp_replace(element_at(col("newCol1"),-1),"[\]]","")).\
withColumn("new2",array_except(col("newCol1"),array(lit('purchase'),lit('customer'),lit('purchase]'),lit('customer]')))).\
withColumn("new2",expr("""transform(new2,x -> concat("'",regexp_replace(x,"[\\\\[]",""),"'"))""")).\
drop(*['col','newCol1']).\
groupby("new1","rowNum").agg(flatten(collect_list(col("new2"))).alias("new2")).\
groupby("rowNum").pivot("new1").agg(first(col("new2"))).\
show(10,False)
#+------+--------+---------------------------+
#|rowNum|customer|purchase |
#+------+--------+---------------------------+
#|200 |['doe'] |null |
#|100 |['john']|['abc, mno, xyz', 'abc123']|
#+------+--------+---------------------------+
我的一个数据帧(spark.sql)有这个模式。 我需要将其保存到CSV文件,但不使用任何扁平化,以以下格式分解。 我直接使用了命令 ,这符合我的目的,但我需要一个更好的方法。我正在使用派斯帕克
问题内容: 我有pyspark数据框,其中包含名为 Filters 的列:“ array>” 我想将数据帧保存在csv文件中,为此,我需要将数组转换为字符串类型。 我尝试将其强制转换为:和,但是这两种解决方案都会为“过滤器”列中的每一行生成错误消息: 代码如下 样本JSON数据: 谢谢 !! 问题答案: 我创建了一个样本JSON数据集来匹配该模式: 使用explode()函数可以最佳化解决您的问题
问题内容: 给定python字符串列表,如何自动将其转换为正确的类型?意思是,如果我有: 我希望将其转换为列表 其中第一个元素是stuntng,第二个元素是int,第三个元素是float,第四个元素是int。 我怎样才能做到这一点?谢谢。 问题答案: import ast
方法(下面)是一个类型,我用作参数的类是一个。我想知道我选角的方式是否管用: 这就是方法: 此URL指向类: 我把它放进了粘贴箱,因为课程太长了。
问题内容: 我想通过使用struct / interface的字符串名称值将特定变量转换为特定的已定义struct / interface。 例如: 和新变量 这可能是偶然的吗?也许使用反射? 干杯 问题答案: 这不可能。Go是一种静态类型的语言,这意味着必须在编译时知道变量和表达式的类型。 在类型断言中: […]如果类型断言成立,则表达式的值为存储在其中的值,其类型为。 因此,您可以使用类型断言
问题内容: 我有以下数据框: 现在,我想将Vacationdate列的数据类型更改为String,以便数据框也采用这种新类型并覆盖所有条目的数据类型数据。例如写后: Vacationdate的数据类型应被覆盖。 我已经使用过诸如cast,StringType或astype之类的函数,但是我没有成功。你知道怎么做吗? 问题答案: 让我们创建一些虚拟数据: 如果Spark> = 1.5.0,则可以使用