当前位置: 首页 > 知识库问答 >
问题:

使用 UDF 会在以下情况下忽略条件

吕英豪
2023-03-14

假设您有以下pyspark数据帧:

data= [('foo',), ('123',), (None,), ('bar',)]
df = sqlCtx.createDataFrame(data, ["col"])
df.show()
#+----+
#| col|
#+----+
#| foo|
#| 123|
#|null|
#| bar|
#+----+

接下来的两个html" target="_blank">代码块应该做同样的事情-即,如果列不是<code>null<code>则返回该列的大写。但是,第二种方法(使用<code>udf</code>)会产生错误。

方法1:使用< code > py spark . SQL . functions . upper()

import pyspark.sql.functions as f
df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.upper(f.col('col')))
).show()
#+----+-----+
#| col|upper|
#+----+-----+
#| foo|  FOO|
#| 123|  123|
#|null| null|
#| bar|  BAR|
#+----+-----+

方法 2: 在 udf 内部使用 str.上 ()

df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col')))
).show()

这给了我< code > attribute error:“NoneType”对象没有属性“upper”。为什么< code>f.isnull()检查在调用< code>when时似乎被忽略了?

我知道我可以将我的 udf 更改为 f.udf(lambda x: x.upper() 如果 x else x, 字符串类型 ()) 以避免此错误,但我想了解为什么会发生这种情况。

完整回溯:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-38-cbf0ffe73538> in <module>()
      4         f.isnull(f.col('col')),
      5         f.col('col')
----> 6     ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col')))
      7 ).show()

/opt/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    316         """
    317         if isinstance(truncate, bool) and truncate:
--> 318             print(self._jdf.showString(n, 20))
    319         else:
    320             print(self._jdf.showString(n, int(truncate)))

/opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/SPARK2/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o642.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 77.0 failed 4 times, most recent failure: Lost task 51.3 in stage 77.0 (TID 5101, someserver.prod.somecompany.net, executor 99): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-38-cbf0ffe73538>", line 6, in <lambda>
AttributeError: 'NoneType' object has no attribute 'upper'

共有1个答案

彭令秋
2023-03-14

您必须记住,Spark SQL(与RDD不同)不是您看到的,而是您得到的。Optimizer/planner可以按任意顺序安排操作,甚至可以多次重复阶段。

Python udfs 不是基于应用的,而是使用批处理模式。被忽略得不那么多,却不用来优化执行计划的时候:

== Physical Plan ==
*Project [col#0, CASE WHEN isnull(col#0) THEN col#0 ELSE pythonUDF0#21 END AS upper#17]
+- BatchEvalPython [<lambda>(col#0)], [col#0, pythonUDF0#21]
   +- Scan ExistingRDD[col#0]

因此,与udf一起使用的函数必须对输入具有鲁棒性,例如:

df.withColumn(
    'upper',
    f.udf(
        lambda x: x.upper() if x is not None else None, 
        StringType()
    )(f.col('col'))
).show()
 类似资料:
  • 问题内容: 假设您具有以下pyspark DataFrame: 接下来的两个代码块应该做同样的事情-即,如果不是,则返回该列的大写。但是,第二种方法(使用)会产生错误。 方法1 :使用 方法2 :在内部使用 这给了我。为什么调用中的检查似乎被忽略了? 我知道我可以改变我要避免这种错误,但我想知道为什么它的发生。 完整回溯 : 问题答案: 您必须记住,Spark SQL(与RDD不同)不是您所看到的

  • 问题内容: 很抱歉那个愚蠢的问题。如何在javascript切换大小写语言元素中为案件使用条件?像下面的示例一样,当变量<= 5和> 0 时,大小写应该匹配;但是,我的代码不起作用: 感谢您的任何建议! 问题答案: 这有效: 此答案的先前版本认为括号是罪魁祸首。实际上,括号在这里是无关紧要的-唯一必要的是您的case表达式必须为布尔值。 之所以起作用,是因为我们将给开关的值用作比较的依据。因此,同

  • 问题内容: 鉴于以下课程 当我们验证它(例如,使用@Valid)并且如果Website.url不遵守我的自定义@ValidUrl约束时,我们将遇到约束冲突(例如,“ URL不可访问”)。 我想知道如果用户愿意,是否可以忽略该验证。 脚步: 第一次验证表格 引发约束冲突并将其显示给用户 用户选择“我知道,仍然添加”,然后重新提交 第二次验证表单,验证@ValidUrl以外的所有内容 问题答案: 您可

  • 我希望在可以检索到它时忽略一个字段: 任何帮助都将不胜感激!

  • 在上述情况下,如果用户为空,则给出NPE。如何避免NPE?

  • 这是我的xpath: Selenium WebDriver代码忽略了上述xpath中::之后的所有内容。 下面是当我在上面使用xpath作为定位器来标识页面中的元素时,在selenium中遇到的错误。 关于如何让webdriver接受上面的整个xpath,有什么想法或建议吗? 下面是HTML代码: 以下是WebDriverJava代码: 元素是一个复选框,我试图通过应用方法来检查它。单击()。 下