当前位置: 首页 > 知识库问答 >
问题:

如何在Python中检查UDF函数中pyspark dataframe列的单元格值是否为none或NaN来实现正向填充?

史意致
2023-03-14

我基本上是在尝试做一个向前填充的归因。下面是它的代码。

df = spark.createDataFrame([(1,1, None), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6, None)], ('session',"timestamp", "id"))

PRV_RANK = 0.0
def fun(rank):
    ########How to check if None or Nan?  ###############
    if rank is None or rank is NaN:
        return PRV_RANK
    else:
        PRV_RANK = rank
        return rank        

fuN= F.udf(fun, IntegerType())

df.withColumn("ffill_new", fuN(df["id"])).show()

我在日志里发现了奇怪的错误。

编辑:问题与如何使用Python识别spark数据帧中的null&nan有关。

文件“C:\spark\python\pyspark\sql\dataframe.py”,第318行,显示打印(self._jdf.showstring(n,20))

文件“C:\spark\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py”,第1133行,在呼叫应答中,self.gateway_client,self.target_id,self.name)

文件“C:\spark\python\pyspark\sql\utils.py”,第63行,deco格式返回f(*a,**kw)

文件“C:\spark\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py”,第319行,格式为get_return_value(target_id,“.”,name,value)

PY4JJavaError:调用O806.ShowString时出错。:org.apache.spark.sparkException:由于阶段失败而中止的作业:阶段47.0中的任务0失败1次,最近的失败:阶段47.0中丢失的任务0.0(TID 83,本地主机,执行器驱动程序):org.apache.spark.api.python.pythonException:追溯(最近一次调用):文件“C:\spark\python\lib\pyspark.zip\pyspark\worker.py”,第174行,在主文件“C:\spark\python\lib\pyspark\worker.py”中,第169行,在进程文件“C:\spark\python\lib\pyspark.zip\pyspark\worker.py”中\worker.py“,第106行,在文件”C:\spark\python\lib\pyspark.zip\pyspark\worker.py“中,第92行,在文件”C:\spark\python\lib\pyspark.zip\pyspark\worker.py“中,第70行,在文件”“中,第5行,在forwardfil unboundLocalError中:赋值前引用的局部变量'prv_rank'

在org.apache.spark.api.python.pythonrunner$$anon$1.read(pythonrdd.scala:193)在org.apache.spark.api.python.pythonrunner$$anon$1.(pythonrdd.scala:234)在org.apache.spark.api.python.pythonrunner.compute(pythonrdd.scala:152)在org.apache.spark.api.python.python.patche.spark.sql.execution.python.batcheValpythonexecec$$anonfun$doexecution$1.apply(BatcheValpythonexecution.144)在BatcheValpythonexec$$anonfun$doexecute$1.apply(BatcheValpythonexec.scala:87)在org.apache.spark.rdd.rdd$$anonfun$apply$23。apply(Rdd.scala:797)在org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$anonfun$apply$23。apply(Rdd.scala:797)在org.apache.spark.rdd.mappartitionsrdd.compute(Mappartitionsrdd.scala:38)在org.apache.spark.rdd.computeorreadcheckpoint(Rdd.scala:3)。23)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.rdd.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.rdd.mappartitionsrdd.cala:38)在org.apache.spark.rdd.rdd.compute(mappartitionsrdd.scala:38)在org.apache.spark.rdd.computeorreadcheckpoint(rdd.scala:323)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)在org.apache.spark.scheduler.task.run(task.scala:99)在krunner.run(executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runworker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:748)

在org.apache.spark.api.python.pythonrunner$$anon$1.read(pythonrdd.scala:193)在org.apache.spark.api.python.pythonrunner$$anon$1.(pythonrdd.scala:234)在org.apache.spark.api.python.pythonrunner.compute(pythonrdd.scala:152)在org.apache.spark.api.python.python.patche.spark.sql.execution.python.batcheValpythonexecec$$anonfun$doexecution$1.apply(BatcheValpythonexecution.144)在BatcheValpythonexec$$anonfun$doexecute$1.apply(BatcheValpythonexec.scala:87)在org.apache.spark.rdd.rdd$$anonfun$apply$23。apply(Rdd.scala:797)在org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$anonfun$apply$23。apply(Rdd.scala:797)在org.apache.spark.rdd.mappartitionsrdd.compute(Mappartitionsrdd.scala:38)在org.apache.spark.rdd.computeorreadcheckpoint(Rdd.scala:3)。23)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.rdd.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.rdd.mappartitionsrdd.cala:38)在org.apache.spark.rdd.rdd.compute(mappartitionsrdd.scala:38)在org.apache.spark.rdd.computeorreadcheckpoint(rdd.scala:323)在org.apache.spark.rdd.rdd.iterator(rdd.scala:287)在org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)在org.apache.spark.scheduler.task.run(task.scala:99)在krunner.run(executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runworker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$worker.run(ThreadPoolExecutor.java:617)1更多

共有1个答案

田镜
2023-03-14
df.withColumn("ffill_new", f.UserDefinedFunction(lambda x: x or 0, IntegerType())(df["id"])).show()
 类似资料:
  • 问题内容: 我只想检查Pandas系列中的单个单元格是否为null,即检查值是否为。 所有其他答案适用于序列和数组,但不适用于单个值。 我已经试过,,。是否只有一个单一值的解决方案? 问题答案: 尝试这个:

  • 问题内容: 在Python Pandas中,检查DataFrame是否具有一个(或多个)NaN值的最佳方法是什么? 我知道函数,但是这会为每个元素返回一个布尔值的DataFrame。此处的帖子也无法完全回答我的问题。 问题答案: jwilner的反应是现场的。我一直在探索是否有更快的选择,因为根据我的经验,求平面数组的总和(奇怪)比计数快。这段代码似乎更快: 速度稍慢,但当然还有其他信息-的数量。

  • 在Python Pandas中,检查DataFrame是否有一个(或多个)NaN值的最佳方法是什么? 我知道函数,但它为每个元素返回布尔值的数据集。这里的这篇文章也没有完全回答我的问题。

  • 问题内容: 我只是在Firefox的JavaScript控制台中尝试过,但是以下任何语句都不返回true: 问题答案: 试试这个代码:

  • 问题内容: 可以说我有以下内容 : 看起来像: 第一选择 我知道一种检查特定值是否为的方法,如下所示: 第二种选择(不起作用) 我认为下面的选项using可以正常工作,但事实并非如此: 我也尝试了相同的结果: 但是,如果我使用或检查这些值,则会得到: 那么, 为什么第二种选择不起作用? 是否可以使用或检查值? 问题答案: 尝试这个: 更新: 在较新的Pandas版本中,使用pd.isna():

  • 问题内容: float(‘nan’)结果为Nan(不是数字)。但是,如何检查呢?应该很容易,但是我找不到。 问题答案: 返回如果为(非数字),以及其他。