我使用的是pyspark 2.2,有以下模式
root
|-- col1: string (nullable = true)
|-- col2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
和数据
+----+----------------------------------------------+
|col1|col2 |
+----+----------------------------------------------+
|A |[[id1, [k -> v1]], [id2, [k2 -> v5, k -> v2]]]|
|B |[[id3, [k -> v3]], [id4, [k3 -> v6, k -> v4]]]|
+----+----------------------------------------------+
col2
是一个复杂的结构。它是一个结构数组,每个结构都有两个元素,一个id
字符串和一个元数据
映射。(这是一个简化的数据集,真实数据集在结构中具有 10 个元素,在元数据
字段中有 10 个键值对)。
我想创建一个查询,返回与我的过滤逻辑相匹配的数据帧(比如< code>col1 == 'A'和< code>col2.id == 'id2'和< code > col 2 . metadata . k = = ' v2 ' )。
结果如下所示,过滤逻辑最多可以匹配数组中的一个结构,因此在第二列中,它只是一个结构,而不是一个结构的数组
+----+--------------------------+
|col1|col2_filtered |
+----+--------------------------+
|A |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+
我知道如何通过爆炸
来实现这一点,但问题是col2
通常有超过100个结构,并且最多会有一个与我的过滤逻辑相匹配,所以我不认为爆炸
是一个可扩展的解决方案。
有人可以告诉我如何做到这一点,提前感谢!
下面是用于设置内容的代码块。
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
schema = StructType([
StructField('col1', StringType(), True),
StructField('col2', ArrayType(
StructType([
StructField('id', StringType(), True),
StructField('metadata', MapType(StringType(), StringType()), True)
])
))
])
data = [
('A', [('id1', {'k': 'v1'}), ('id2', {'k': 'v2', 'k2': 'v5'})]),
('B', [('id3', {'k': 'v3'}), ('id4', {'k': 'v4', 'k3': 'v6'})])
]
df = spark.createDataFrame(data=data, schema=schema)
除了@mck的解决方案,我在搜索后尝试了另外三种方法,都得到了预期的结果。
udf
进行筛选并返回匹配的结构df.filter(df.col1 == 'A') \
.select(df.col1, udf(lambda a: [s for s in a if s.id == 'id2' and s.metadata['k'] == 'v2'], df.schema['col2'].dataType)('col2')[0].alias('col2_filtered')) \
.na.drop('any')
df.filter(df.col1 == 'A') \
.select(df.col1, df.col2.getItem(udf(lambda a: [i for i, s in enumerate(a) if s.id == 'id2' and s.metadata['k'] == 'v2'], ArrayType(IntegerType(), True))(df.col2)[0]).alias('col2_filtered')) \
.na.drop('any')
df.filter(df.col1 == 'A') \
.select(df.col1, expr("filter(col2, s -> s.id == 'id2' AND s.metadata['k'] == 'v2')").getItem(0).alias('col2_filtered')) \
.na.drop('any')
编辑:你可以试试UDF:
import pyspark.sql.functions as F
df2 = df.filter(
F.udf(lambda x: any([y.id == 'id2' and 'k' in y.metadata.keys() for y in x]), 'boolean')('col2')
).withColumn(
'col2',
F.udf(lambda x: [y for y in x if y.id == 'id2' and 'k' in y.metadata.keys()][0], 'struct<id:string,metadata:map<string,string>>')('col2')
)
df2.show(truncate=False)
+----+--------------------------+
|col1|col2 |
+----+--------------------------+
|A |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+
您可以将列转换为JSON,并检查col2是否包含所需的JSON:
import pyspark.sql.functions as F
df2 = df.filter(
(F.col('col1') == 'A') &
F.to_json('col2').contains(
F.to_json(
F.struct(
F.lit('id2').alias('id'),
F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
)
)
)
)
df2.show(truncate=False)
+----+------------------------------------+
|col1|col2 |
+----+------------------------------------+
|A |[[id1, [k -> v1]], [id2, [k -> v2]]]|
+----+------------------------------------+
如果只想保留col2中的匹配结构,可以使用< code>withColumn替换它:
df3 = df2.withColumn(
'col2',
F.struct(
F.lit('id2').alias('id'),
F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
)
)
df3.show()
+----+----------------+
|col1| col2|
+----+----------------+
| A|[id2, [k -> v2]]|
+----+----------------+
提取过滤器用于过滤数据源中提取的数据。如果用户从数据源中提取数据,则使用此过滤器。 将文本文件连接到Tableau后,可以在数据源选项卡的右上角看到两个选项“实时(Live)”和“提取(Extract)”。 实时连接直接连接到数据源。提取连接从数据源中提取数据,并在Tableau存储库中创建本地副本。下面逐步给出创建提取过滤器的过程。 第1步:使用Tableau连接文本文件。 单击“提取(Extr
问题内容: 我想做类似的事情: Python的标准库中是否有类似行为? 我知道在这里自己动手很容易,但是我正在寻找一种更标准的方法。 问题答案: 您可以使用filter方法: 或列表理解: 要查找单个元素,可以尝试: 尽管如果没有匹配项将引发异常,因此您可能希望将其包装在try / catch中。方括号()使之成为生成器表达式,而不是列表理解。 就我个人而言,尽管我只是使用常规的过滤器/理解并采用
问题内容: 我有一个MySql表,我想查询其中 成对 的列在特定集中的行。例如,假设我的表格如下所示: 现在,我希望提取其中 (f1,f2) 对为(’a’,30)或(’b’,20)的行,即行2,3,4。我也希望使用“ IN”样式的过滤器来完成此操作,因为我可能有很多对要提取。如果我尝试类似的方法: 我得到IN子句中为f1和f2指定的值的笛卡尔积,即具有f1 =’a’或’b’的所有可能组合的行,以及
问题内容: 我正在尝试过滤具有作为行值的PySpark数据框: 我可以使用字符串值正确过滤: 但这失败了: 但是每个类别上肯定都有价值。这是怎么回事? 问题答案: 您可以使用/ : 如果你想简单地丢弃值,您可以使用与参数: 基于等式的比较将无法正常工作,因为在SQL中未定义,因此任何将其与另一个值进行比较的尝试都将返回: 与值进行比较的唯一有效方法是/ ,它等效于/方法调用。
我已经生成了<code>pyspark.sql.dataframe。DataFrame带有列名称cast和score。 但是,我想在cast列中保留唯一的名字,而不是与它们相关联的id,与_score列并排。 列中数据类型的结构如下所示 有人可以帮助我如何提取只有演员姓名和分数列的数据。 提前致谢
我试图子集一个熊猫DataFrame在python基于两个逻辑语句 即。 但是第3行的语法无效。 有没有一种方法可以在一行中完成?