当前位置: 首页 > 知识库问答 >
问题:

在自实现对象/类的函数上使用Pysparks rdd.parallelize().map()

狄冠宇
2023-03-14
class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2


print(MyMathObject(3).square()) # Test one instance with regular python - works

另外,我设置了pyspark(在jupyter笔记本中),现在我想在对象上并行计算0到4的平方:

import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext("local[2]")

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails

这不起作用--它会导致一个非常长的错误消息,对我来说几乎没有帮助。我唯一觉得有点有趣的一行是:

AttributeError:无法从'/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>中获取

rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()

因此,我创建或操作对象的方式似乎有缺陷,但我无法追踪错误。

完整的错误消息:

Py4JJavaError跟踪(最近的调用为last)在1 rdd=sc.parallelize([myMathObject(i)for i在range(5)])-->2 rdd.map(lambda obj:obj.square()).collect()中

PY4JJavaError:调用z:org.apache.spark.api.python.pythonrdd.CollectandServe时出错。:arg.apache.spark.sparkException:作业因阶段失败而中止:阶段1.0中的任务0失败了1次,最近的失败:在stage 1.0(TID 2,192.168.2.108,executor driver)中丢失了Task0.0:org.apache.spark.api.python.pythonException:Traceback(最近的调用最后一次):文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第605行,主进程()文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark/worker)文件”/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py“,第271行,在dump_stream vs=list(itertools.islice(iterator,batch)中)文件”/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark/serializers.py“,第147行,在load_stream yield self._read_with_length(stream)/pyspark/serializers.py”,第172行,在_read_with_length返回self.loads(obj)文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py“,第458行,在loads return pickle.loads(obj,encoding=encoding)attributeerror:不能从'/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>中获取 object'<>

在org.apache.spark.api.python.basepythonRunner$readeriterator.handlePythonException(PythonRunner.scala:503)在org.apache.spark.api.python.pythonRunner$$anon$3在org.apache.spark.python.pythonRunner.scala:638)在org.apache.spark.apithon.pythonRunner$$anon$3在interruptibleIterator.scala:37)在scala.collection.iterator.foreach(iterator.scala:941)在scala.collection.iterator.foreach$(iterator.scala:941)在org.apache.spark.interruptibleIterator.foreach$(iterator.scala:28)在scala.collection.generic.growable.$plus$plus$eq(growable.scala:62)在在scala.collection.traversableonce.to(traversableonce.scala:315)在scala.collection.traversableonce.to$(traversableonce.scala:313)在org.apache.spark.interruptibleIterator.to(interruptibleIterator.scala:28)在scala.collection.to(traversableIterator.to(traversableIterator.scala:28)在scala.collection.traversableOnce.to Buffer(traversableOnce.scala:307)在scala.collection.to toArray$(traversableonce.scala:288)在org.apache.spark.interruptibleiterator.toArray(interruptibleiterator.scala:28)在org.apache.spark.rdd.rdd.rdd.to Array(interruptibleiterator.scala:28)在org.apache.spark.rdd.rdd.$anonfun$collect$2(rdd.scala:1004)在org.apache.spark.spark.sparn在org.apache.spark.sparn在k.executor.executor$taskrunner.$anonfun$run$3(executor.scala:462)at org.apache.spark.util.utils$.trywithsafeFinally(utils.scala:1377)在org.apache.spark.executor.executor$taskrunner.run(executor.scala:465)在java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1128)在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.javer.javar.java.lang.thread.run(

Driver StackTrace:at org.apache.spark.scheduler.dagscheduler.failjobandIndependentStages(Dagscheduler.scala:2059)at org.apache.spark.scheduler.dagscheduler.$anonfun$abortstage$2(Dagscheduler.scala:2008)at org.apache.spark.scheduler.dagscheduler.$anonfun$abortstage$2(dagscheduler.scala:2008)at rayBuffer.foreach(arrayBuffer.scala:49)在org.apache.spark.scheduler.dagScheduler.abortStage(DagScheduler.scala:2007)在org.apache.spark.scheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.dagScheduler.$AnonFun$handleTasksetfailed$1(DagScheduler.scala:973)在org.apache.sparch.k.Scheduler.dagSchedulerEventProcessLoop.doon在org.apache.spark.scheduler.dagschedulerEventProcessloop.on在org.apache.spark.scheduler.dagscheduler.scala:2188)在org.apache.spark.scheduler.dagschedulerEventProcessloop.on在org.apache.spark.scheduler.dagscheduler.scala:2177)在org.apache.spark.scheduler.dagscheduler.scala:2177)在sparkcontext.scala:2135)在org.apache.spark.sparkcontext.runjob(sparkcontext.scala:2154)在org.apache.spark.sparkcontext.runjob(sparkcontext.scala:2179)在org.apache.spark.rdd.rdd.$anonfun$collect$1(rdd.scala:1004)在在org.apache.spark.rdd.rdd.rdd.collect(rdd.scala:1003),org.apache.spark.api.python.pythonrdd$.collectAndServe(Pythonrdd.scala:168)在org.apache.spark.api.python.pythonrdd.collectandServe(Pythonrdd.scala)在java.base/jdk.internal.reflect.nativeMethodAccessorimpl.invoke0(原生方法)在在py4j.reflection.methodInvoker.invoke(methodInvoker.java:244)在py4j.reflection.reflection.engine.invoke(reflectionengine.java:357)在py4j.gateway.invoke(gateway.java:282)在py4j.commands.abstractCommand.invokeMethod(abstractCommand.java:132)在py4j.commands.callCommand.execute(callCommand.java:79)在:Traceback(最近调用的最后一次):文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py“,第605行,在主进程()文件”/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/spark-3.0.2-bin-hadoop3.2/python/pyspark.zip/pyspark/worker.py“中,第597行,在进程serializer.dump_stream(out_iter,outfile)文件”/opt/apache-spark-3/spark-3.0.2-))文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第147行,load_stream中的yield self._read_with_length(stream)文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第172行,_read_with_length(stream)中的return serializers.py“,第458行,在loads return pickle.loads(obj,encoding=encoding)attributeError:不能从'/opt/apache-spark-3/中获取

在org.apache.spark.api.python.basepythonRunner$readeriterator.handlePythonException(PythonRunner.scala:503)在org.apache.spark.api.python.pythonRunner$$anon$3在org.apache.spark.python.pythonRunner.scala:638)在org.apache.spark.apithon.pythonRunner$$anon$3在interruptibleIterator.scala:37)在scala.collection.iterator.foreach(iterator.scala:941)在scala.collection.iterator.foreach$(iterator.scala:941)在org.apache.spark.interruptibleIterator.foreach$(iterator.scala:28)在scala.collection.generic.growable.$plus$plus$eq(growable.scala:62)在在scala.collection.traversableonce.to(traversableonce.scala:315)在scala.collection.traversableonce.to$(traversableonce.scala:313)在org.apache.spark.interruptibleIterator.to(interruptibleIterator.scala:28)在scala.collection.to(traversableIterator.to(traversableIterator.scala:28)在scala.collection.traversableOnce.to Buffer(traversableOnce.scala:307)在scala.collection.to toArray$(traversableonce.scala:288)在org.apache.spark.interruptibleiterator.toArray(interruptibleiterator.scala:28)在org.apache.spark.rdd.rdd.rdd.to Array(interruptibleiterator.scala:28)在org.apache.spark.rdd.rdd.$anonfun$collect$2(rdd.scala:1004)在org.apache.spark.spark.sparn在org.apache.spark.sparn在k.executor.executor$taskrunner.$anonfun$run$3(executor.scala:462)at org.apache.spark.util.utils$.trywithsafeFinally(utils.scala:1377)在org.apache.spark.executor.executor$taskrunner.run(executor.scala:465)在java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.javurent.threadpoolexecutor.javer:1128)在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:628)...1更多

共有1个答案

虞俊美
2023-03-14

这看起来不像是您的错误,但是一种解决方法是将模块放在一个单独的Python文件中并导入它:

例如,打开一个文件mymodule.py

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2

在主脚本中,可以执行

from mymodule import MyMathObject

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect()
 类似资料:
  • 本文向大家介绍javascript用函数实现对象的方法,包括了javascript用函数实现对象的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了javascript用函数实现对象的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的javascript程序设计有所帮助。

  • 问题内容: 我有一个扩展其超类的子类的对象。子类中有一个重写的方法,可以使用对象调用该方法。可以使用子类对象调用超类的函数吗? 考虑上面的代码。 它在这里打印 子类去 。相反,我必须打印 超类去 。 问题答案: 不,这是不可能的,如果您认为需要,请重新考虑您的设计。覆盖方法的全部要点是替换其功能。如果一个不同的类对该类的内部工作非常了解,那么您将完全取消封装。

  • 问题内容: 我正在尝试验证实例属性和类属性之间的区别,该区别由2012年11月1日发布的Python教程2.7.3版第9章:类,第66页最后一行( 源 ): 实例对象的有效方法名称取决于其类。根据定义,作为函数对象的类的所有属性都定义了其实例的相应方法。因此,在我们的示例中,xf是有效的方法引用,因为MyClass.f是函数,而xi则不是,因为MyClass.i不是。 但是xf与MyClass.f

  • 问题内容: 我有这样的事情: 如您所见,超类的init方法是抽象的,并在创建对象后由构造函数自动调用。 当我需要创建这种类型的对象且其结构不会及时更改时,我很好奇我是否会遇到这样的代码问题。 有什么更好的办法吗?它可以在Java中运行,但是可以在C ++和ActionScript中运行吗? 谢谢你的答案。 问题答案: 请勿从构造函数中调用过多的方法。 引用 有效Java 2nd Edition,条

  • 我是科特林·纽比。我有一组函数,它们接受和解析不同的输入(纯文本、json、xml),但具有相同的输出(和事件实例)。代码如下所示(完整版本见https://pastebin.com/UNJFGZsm): 当我尝试构建时,会出现如下错误: (44,11):外投影类型“功能1” 但是,如果我不使用函数,代码似乎可以正确构建和工作。 为什么?为什么问题似乎只影响

  • 问题内容: 如何从Java的构造函数中获取实例化对象? 我想为某些GUI类存储对父对象的引用,以模拟事件冒泡-调用父处理程序-但我不想更改所有现有代码。 问题答案: 简短的回答:Java没有办法做到这一点。(您可以找到哪个班级叫您,但以下较长的答案在大多数情况下也适用于您。) 长话大说:依赖于被调用的地方,魔术地表现不同的代码几乎总是一个坏主意。这会使必须维护您代码的人感到困惑,并且严重损害了您的