当前位置: 首页 > 知识库问答 >
问题:

PySpark在每一行DataFrame上执行普通Python函数

从经略
2023-03-14
col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...
func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 
col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

PySpark有可能吗?如果有,你能展示一个如何实现的例子吗?以mapfromdf.columns作为输入参数的UDF函数是正确的方法,还是可以以更简单的方式完成?Spark对一个时间点可以注册多少UDF函数(数量)有任何限制吗?

共有1个答案

越姚石
2023-03-14
data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
    .show(truncate=False)
+----+----+----+----+----+----+---------------------+
|col1|col2|col3|col4|col5|col6|tags                 |
+----+----+----+----+----+----+---------------------+
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |
+----+----+----+----+----+----+---------------------+
 类似资料:
  • 问题内容: 我想出于调试目的,打印出与python方法中执行的每一行有关的内容。 例如,如果该行中有一些赋值,我想打印为该变量分配的值,如果有一个函数调用,我想打印出该函数返回的值,等等。 因此,例如,如果我要使用装饰器,则将其应用于函数/方法,例如: 调用功能测试时,应打印以下内容: 有什么办法可以做到这一点?更重要的是,我想知道我是否可以编写可以逐行通过其他代码的代码,检查它是哪种类型的指令,

  • 本文向大家介绍将函数应用于Python中Pandas DataFrame中的每一行,包括了将函数应用于Python中Pandas DataFrame中的每一行的使用技巧和注意事项,需要的朋友参考一下 在本教程中,我们将学习列表的最常用方法,即append()和extend()。让我们一一看。 应用() 它用于将函数应用于DataFrame的每一行。例如,如果我们想将每个中的所有数字相乘并将其添加为

  • 问题内容: 我有一个具有维度(不包括索引)的DataFrame(df1),我想要为该行将每个行除以另一个具有维度的DataFrame(df2)。两者具有相同的列标题。我试过了: 以及 其他多个解决方案,我总是在每个单元格中获得包含值的df 。我在函数中缺少什么参数? 问题答案: 在中,您需要提供df2(例如)的轴/行。 或者你可以使用

  • 问题内容: 我想每60秒在Python上执行一个函数,但是我不想同时被阻塞。 如何异步进行? 使用此代码,函数f在20秒time.time中每3秒执行一次。最后,它给出了一个错误,我认为这是因为threading.timer尚未被取消。 如何取消呢? 提前致谢! 问题答案: 您可以尝试threading.Timer类:http : //docs.python.org/library/threadi

  • 我目前有一个数据表,其中一列类型为“a b c d e...”。将此列称为“COL4” 我想通过拆分col4的元素来将单行拆分为多行,同时保留所有其他列的值。 COL1[0]COL2[0]COL3[0]a b c 我希望输出为: COL1[0]COL2[0]COL3[0]a COL1[0]COL2[0]COL3[0]a b c 这不是我想要的。

  • 如果有一个数据帧,并希望根据行的值对函数中的数据进行一些操作。 有谁知道如何将我的udf应用于数据帧?