当前位置: 首页 > 面试题库 >

如何使用Python类处理RDD?

芮承运
2023-03-14
问题内容

我正在Spark中将模型实现为python类,并且每次尝试将类方法映射到RDD时,它都会失败。我的实际代码更加复杂,但是这个简化的版本成为了问题的核心:

class model(object):
    def __init__(self):
        self.data = sc.textFile('path/to/data.csv')
        # other misc setup
    def run_model(self):
        self.data = self.data.map(self.transformation_function)
    def transformation_function(self,row):
        row = row.split(',')
        return row[0]+row[1]

现在,如果我像这样运行模型(例如):

test = model()
test.run_model()
test.data.take(10)

我收到以下错误:

例外:看来您试图从广播变量,操作或转换中引用SparkContext。SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用。有关更多信息,请参见SPARK-5063。

我已经玩了一点,并且似乎在我尝试将类方法映射到类中的RDD时,确实可以可靠地发生。我已经确认,如果我在类结构之外实现映射函数,则该函数可以正常工作,因此问题肯定与该类有关。有办法解决吗?


问题答案:

与使用 嵌套的RDD或在转换内部执行Spark操作 相比,这里的问题要微妙得多。Spark不允许访问SparkContext内部操作或转换。

即使您没有显式访问它,也要在闭包内部对其进行引用,并且必须对其进行序列化和携带。这意味着transformation引用的方法也会self保留SparkContext,因此会出现错误。

一种解决方法是使用静态方法:

class model(object):
    @staticmethod
    def transformation_function(row):
        row = row.split(',')
        return row[0]+row[1]

    def __init__(self):
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function)

编辑

如果您希望能够访问实例变量,可以尝试如下操作:

class model(object):
    @staticmethod
    def transformation_function(a_model):
        delim = a_model.delim
        def _transformation_function(row):
            return row.split(delim)
        return _transformation_function

    def __init__(self):
        self.delim = ','
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function(self))


 类似资料:
  • 本文向大家介绍如何使用python处理Selenium中的帧?,包括了如何使用python处理Selenium中的帧?的使用技巧和注意事项,需要的朋友参考一下 我们可以处理Selenium中的帧。框架是一个HTML元素,用于将文档保留在页面中的另一个文档中。HTML具有<frame>或<iframe>标记,用于将框架嵌入文档中。 Selenium中有多个API可用于框架。它们在下面列出- swit

  • 问题内容: 问题: GhostDriver API尚不支持警报处理。目前有一种可接受的解决方法,即将您自己的javascript注入将处理警报并为您存储其文本的页面。 我在通过python webdriver绑定使用此解决方法时遇到了麻烦。这可能与我的新手对javascript的理解有关。 这是我尝试利用的变通方法的示例:https : //github.com/detro/ghostdriver

  • 问题内容: 有人可以为我指出如何在python中打开.mdb文件的正确方向吗?我通常喜欢包含一些代码以开始讨论,但是我不知道从哪里开始。我与mysql一起使用python。我想知道是否有一种以类似方式使用.mdb文件的方法? 问题答案: 以下是我为另一个SO问题编写的一些代码。 它需要第三方的pyodbc模块。 这个非常简单的示例将连接到表并将结果导出到文件。 如果您有任何其他更具体的需求,请随时

  • 问题内容: 当我使用访问URL的请求时, cookie会自动发送回服务器 (在下面的示例中,所请求的URL设置了一些cookie值,然后重定向到另一个显示存储的cookie的URL) 是否可以通过将Chrome或Firefox设置为不接受Cookie的方式来临时禁用Cookie处理? 例如,如果我在禁用了cookie处理的情况下使用Chrome访问上述URL,则会得到我期望的结果: 问题答案: 您

  • 我有一个作为服务运行的Python脚本。它写入磁盘。如果用户在服务上调用systemctl stop,我想以自己的方式处理该命令,以降低文件损坏的风险。 如何捕获systemctl停止命令? 我在 /usr/lib/systemd/system的文件是: 我的Python脚本是:

  • 本文向大家介绍如何使用Tensorflow和Python整理预处理的数据?,包括了如何使用Tensorflow和Python整理预处理的数据?的使用技巧和注意事项,需要的朋友参考一下 Tensorflow是Google提供的一种机器学习框架。它是一个开放源代码框架,与Python结合使用以实现算法,深度学习应用程序等等。它用于研究和生产目的。它具有优化技术,可帮助快速执行复杂的数学运算。这是因为它