有没有办法选择整行作为一列输入到Pyspark过滤器udf中?
我有一个复杂的过滤函数“my_filter”,我想应用于整个数据帧:
my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))
但是
col("*")
引发错误,因为这不是有效的操作。
我知道我可以将数据帧转换为RDD,然后使用RDD的过滤方法,但我不想将其转换为RDD,然后再转换回数据帧。我的数据帧具有复杂的嵌套类型,因此当我尝试再次将 RDD 转换为数据帧时,架构推断将失败。
您应该静态地编写所有列。例如:
from pyspark.sql import functions as F
# create sample df
df = sc.parallelize([
(1, 'b'),
(1, 'c'),
]).toDF(["id", "category"])
#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
return (col1>0) & (col2=="b")
df.filter(my_filter('id', 'category')).show()
结果:
+---+--------+
| id|category|
+---+--------+
| 1| b|
+---+--------+
如果您有如此多的列,并且您确定要对列进行排序:
cols = df.columns
df.filter(my_filter(*cols)).show()
产生相同的输出。
我是pyspark的新手,我来尝试做一些像下面这样的事情,为每个cookie调用一个函数Print细节,然后将结果写入文件。spark.sql查询返回正确的数据,我也可以将其序列化为文件。有人可以帮助每个cookie上的for语句。调用UDF的语法应该是什么,如何将输出写入文本文件? 任何帮助是值得赞赏的。谢谢
我正在尝试在PySpark中为两个数据框(df1和df2)创建自定义连接(类似于此),代码如下所示: 我得到的错误消息是: 有没有办法编写一个可以处理来自两个单独数据帧的列的 PySpark UDF?
问题内容: 我正在尝试过滤具有作为行值的PySpark数据框: 我可以使用字符串值正确过滤: 但这失败了: 但是每个类别上肯定都有价值。这是怎么回事? 问题答案: 您可以使用/ : 如果你想简单地丢弃值,您可以使用与参数: 基于等式的比较将无法正常工作,因为在SQL中未定义,因此任何将其与另一个值进行比较的尝试都将返回: 与值进行比较的唯一有效方法是/ ,它等效于/方法调用。
我想循环两个列表,将组合传递给函数,并获得以下输出: 由于这是Pyspark,我想将其并行化,因为函数的每个迭代都可以独立运行。 注:我的实际函数是pyspark中的一个长而复杂的算法。只是想贴一个简单的例子来概括。 最好的方法是什么?
我正在为具有大量内部结构的复杂JSON数据集编写过滤函数。传递单个列太麻烦了。 因此,我声明了以下UDF: 直觉上,我认为它会这样工作: 实际的语法是什么?
问题内容: 我有包含单列和两列的数据行。我要做的是提取仅包含2列的行。 仅产生: 请注意,它们是制表符分隔的,即使对于只有一列的行,您也可以在其开头使用制表符。 怎么做呢? 我尝试了这个但是失败了: 问题答案: 您需要使用(字段数)变量来控制操作,例如以下记录: 如果字段数为2,将打印该行,否则将不执行任何操作。我之所以具有(看似)奇怪的构造,是因为如果没有任何规则匹配一行,则默认情况下将打印的某