当前位置: 首页 > 知识库问答 >
问题:

调用外部 Web 服务的数据砖 UDF 无法序列化(酸洗错误)

闻人栋
2023-03-14

我正在使用数据库,并且在数据框架中有一个列,我需要通过外部Web服务调用为每条记录更新该列。在这种情况下,它使用Azure机器学习服务SDK并进行服务调用。当在Spark中不作为UDF运行时,此代码可以正常工作(即。只是python),但是当我尝试将其称为UDF时,它会引发序列化错误。如果我使用lambda和带有rdd的map,也会发生同样的情况。

该模型使用快速文本,并且可以通过正常的超文本传输协议调用或使用AMLS的WebService SDK从Postman或python中很好地调用-只是当它是UDF时,它会在以下消息中失败:

TypeError:无法pickle_thread_局部对象

我能想到的唯一解决方法是按顺序遍历数据帧中的每个记录,并使用调用更新记录,但是这并不是很有效。我不知道这是火花错误还是因为服务正在加载快速文本模型。当我使用UDF并模拟返回值时,它的工作原理。

底部错误...

from azureml.core.webservice import Webservice, AciWebservice
from azureml.core import Workspace

def predictModelValue2(summary, modelName, modelLabel):  
    raw_data = '[{"label": "' + modelLabel + '", "model": "' + modelName + '", "as_full_account": "' + summary + '"}]'
    prediction = service.run(raw_data)
    return prediction

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

predictModelValueUDF = udf(predictModelValue2)

DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))

TypeError:无法pickle_thread_局部对象

在处理上述异常时,发生了另一个异常:

中的PicklingError回溯(最近一次调用)----

/data bricks/spark/python/py spark/SQL/UDF . py in wrapper(* args)194 @ func tools . wrappers(self . func,assigned = assignments)195 def wrapper(* args):-

/数据砖/火花/蟒蛇/pyspark/sql/udf.py在呼叫(自我, *cols) 172 173 定义调用(自我, *cols): --

/databricks/spark/python/pyspark/sql/udf。py in_judf(self)156#,并且应该具有最小的性能影响。157如果是自我_judf_placeholder为无:--

_create_judf/databricks/spark/python/pyspark/sql/udf.py165 sc=spark.sparkContext166--

/data bricks/spark/python/py spark/SQL/UDF . py in _ wrap _ function(sc,func,returnType)33 def _ wrap _ function(sc,func,returnType): 34 command = (func,returnType) -

/databricks/spark/python/pyspark/rdd.py_prepare_for_python_RDD(sc, Command)2461#序列化的命令将被广播压缩2462 ser=CloudPickleSerializer()-

/数据砖/火花/蟒蛇/pyspark/序列化程序.py转储(自我,obj) 709 msg = “无法序列化对象: %s: %s” % (e.class.name, emsg) 710 cloudpickle.print_exec (sys.stderr) --

PicklingError:无法序列化对象:TypeError:不能pickle_thread_局部对象

共有1个答案

艾宏远
2023-03-14

我不是 DataBricks 或 Spark 方面的专家,但是当您触摸复杂对象(如服务对象)时,从本地笔记本上下文中酸洗函数总是有问题的。在这种特殊情况下,我建议删除对 azureML 服务对象的依赖性,只使用请求来调用服务。

从服务中提取密钥:

# retrieve the API keys. two keys were generated.
key1, key2 = service.get_keys()
scoring_uri = service.scoring_uri

您应该能够直接在UDF中使用这些字符串而不会出现酸洗问题——这里有一个示例,说明如何仅使用请求调用服务。下面应用于您的UDF:

import requests, json
def predictModelValue2(summary, modelName, modelLabel):  
  input_data = json.dumps({"summary": summary, "modelName":, ....})

  headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}

  # call the service for scoring
  resp = requests.post(scoring_uri, input_data, headers=headers)

  return resp.text[1]

但是,在侧节点上:数据帧中的每一行都会调用UDF,每次都会进行网络调用,这将非常慢。我建议寻找批处理执行的方法。正如您可以从构建的json<code>服务中看到的。run将接受一个项目数组,因此您应该以100秒左右的时间批调用它。

 类似资料:
  • 问题内容: 我想搜索给定目录中所有图像中的冲浪,并保存它们的关键点和描述符以供将来使用。我决定使用泡菜,如下所示: 当我尝试执行时,出现以下错误: 有人知道吗,这是什么意思,以及如何解决?我正在使用Python 2.6和Opencv 2.3.1 十分感谢 问题答案: 问题是您不能将cv2.KeyPoint转储到pickle文件中。我遇到了同样的问题,并设法通过本质上对关键点进行序列化和反序列化来解

  • 问题内容: 我有一个类,该类的实例需要按照用户的指示格式化输出。有一种默认格式,可以覆盖。我是这样实现的: 可以吗 让我知道是否有更好的方法来进行设计。 不幸的是,我需要腌制此类的实例。但是只有在模块顶层定义的功能才能被酸洗。函数是不可拾取的,因此我的instance属性破坏了酸洗。 我尝试重写此代码以使用类方法而不是lambda函数,但由于相同的原因仍然没有运气: 请注意,即使不覆盖默认值,此处

  • 这里的TYPO3留档https://docs.typo3.org/typo3cms/ExtbaseFluidBook/6-Persistence/4-use-foreign-data-sources.html说,可以为extbase对象使用外部数据源:“这些外部数据源可能是来自同一个TYPO3数据库或网络服务的表。” 使用的数据映射器至少允许将对象映射到同一MySQL数据库中的任何表。 我对“或w

  • 问题内容: 我必须腌制这样的对象数组: 它给出了以下错误: 有办法解决吗? 问题答案: 内置的pickle模块无法序列化几种python对象(包括lambda函数,嵌套函数和在命令行中定义的函数)。 picloud软件包包括一个更强大的pickler,可以对lambda函数进行pickle。 可以使用常规的pickle / cPickle和功能来反序列化PiCloud序列化的对象。 莳萝还提供类似

  • 此功能的状态是实验性的。我们欢迎您对此功能的实用性的反馈。我们可能会在未来发展这个功能,使其更加通用。虽然我们将努力确保向后兼容性,但不能保证。 场景 在某些情况下,希望能够从交易处理函数中调用REST API。这使你可以将区块链中的复杂计算移出。调用REST API允许交易处理器功能将复杂或昂贵的计算外包给中央或peer托管的服务。 调用外部REST服务 post(url,data)函数可用于交