当前位置: 首页 > 知识库问答 >
问题:

更新UDF Pyspark中的变量值

沈弘文
2023-03-14

我想要一个udf函数,它遍历列“Values”,并检查下一个值是否是当前行值的50%或更多。如果它在50%之内,那么我希望包含值“是”,如果不是,那么我不希望包含值。如果该值在最后一个值和下一个值之间下降得太快,则不应将其包括在内,但如果该值逐渐下降,且与最后一个包括的值相比不超过50%,则可以。这就是为什么。未包括id 5的1,但。包含id 9的1,因为它遵循的值从逐渐下降。4不超过50%。我曾考虑在自定义项中使用一个变量来跟踪最后一个可接受的值,但我不知道该怎么做。

rows = sc.parallelize([[1, .9, 'yes'], [2, .7, 'yes'], [3, .4, 'yes'], [4, .15, 'no'], [5, .1, 'no'], [7, .3, 'yes'], [8, .2, 'yes'], [9, .1, 'yes']])

rows_df = rows.toDF(["ID",  'Values', 'Include'])

#preview data
rows_df.show()

#show data schema
rows_df.printSchema()

+---+------+-------+
| ID|Values|Include|
+---+------+-------+
|  1|   0.9|    yes|
|  2|   0.7|    yes|
|  3|   0.4|    yes|
|  4|  0.15|     no|
|  5|   0.1|     no|
|  7|   0.3|    yes|
|  8|   0.2|    yes|
|  9|   0.1|    yes|
+---+------+-------+

共有1个答案

子车俊材
2023-03-14

为了实现您的目标,您不必使用UDF(事实上我认为这是不可能的),您可以改为使用在窗口上工作的各种函数,例如lag

我必须承认我不完全理解你的要求(为什么5。应该是‘否’?),但是在滞后、领先和最后之间,你应该能够实现它。你可以在文档中阅读更多关于它们的信息。基于先前值执行逻辑的示例:

from pyspark.sql import Window
from pyspark.sql.functions import col, lag, when, lit

windowSpec = Window.orderBy("Id")

withPrevious = rows_df.withColumn("prevVal", lag(rows_df["Values"]).over(windowSpec))

withPrevious.withColumn("Include2", 
                        when(col("prevVal").isNull(), "yes")\
                        .when(col("Values") >= 0.5 * col("prevVal"), lit("yes"))\
                        .otherwise("no"))\
    .show()
+---+------+-------+-------+--------+
| ID|Values|Include|prevVal|Include2|
+---+------+-------+-------+--------+
|  1|   0.9|    yes|   null|     yes|
|  2|   0.7|    yes|    0.9|     yes|
|  3|   0.4|    yes|    0.7|     yes|
|  4|  0.15|     no|    0.4|      no|
|  5|   0.1|     no|   0.15|     yes|
|  7|   0.3|    yes|    0.1|     yes|
|  8|   0.2|    yes|    0.3|     yes|
|  9|   0.1|    yes|    0.2|     yes|
+---+------+-------+-------+--------+
 类似资料:
  • 谁能帮我解决这个问题,或者给我指出正确的方向? 我正在使用Python 3.9.7 我的目标是创建一个python程序,使用Tkinter显示两个变量之间的时间差,我希望这个变量在倒计时时每秒更新一次。 我已经创建了TK窗口,其中显示了标题文本和timedif标签,但是当我的程序运行时,timedif标签不会更新。Timedif标签仅显示程序执行时的timedelta。 我定义了一个函数count

  • 我在同一个包中有两个java文件。我想将一个变量的更新值从一个文件传递到另一个文件。我编写了以下代码。在1班。java:- class2.java:- 问题是最终值打印是的,不应该打印。输出应该很好。请帮我。

  • App.js中有一个全局变量,该变量在我的应用程序组件之外设置。 它监视棋盘游戏正在进行的移动。在黑板下面是一些导航按钮。如果您单击

  • 问题内容: jQuery的 我正在发出一个AJAX请求,该请求会使用服务器的响应来更新变量()的值。这是我正在使用的代码: 问题是的值仍然是一个空字符串。我知道这不是服务器端脚本的问题,因为我会收到错误警报,或者至少得到string 。 这是一个演示问题的JSFiddle:http : //jsfiddle.net/GGDX7/ 为什么不变的价值? 纯JS 我正在发出一个AJAX请求,该请求会使用

  • 问题内容: 第一个System.out打印 2 并且应该打印,而第二个System打印 65 。我已经用这种语言编程了一年多了,据我所知这是不可能发生的!有什么帮助吗? 上面的代码在两行上都显示 9 。 问题答案: 当你这样做,之前仅仅是一个参考阵列, NO 新阵列已创建并分配给。因此,当您查看自己的价值时,基本上就是查看的价值,反之亦然。只是的别名。这就是为什么在第二张照片中您得到65。 检查该

  • 问题内容: 我正在尝试使用Meteor进行如下更新: 但是我在努力如何动态设置方向的数组索引,就像这样: 这不起作用,因为[index]被包装在字符串中。我还尝试形成一个自定义字符串,如下所示: 但这是行不通的。关于如何执行此操作的任何想法? 问题答案: 您需要以编程方式构建对象: 更新资料 如果您的JavaScript环境支持计算出的属性名称(例如,node.js 4+),则可以一步完成: