我在运行下面我写的SPARK代码时出错了。我试图根据键找到所有向量的总和。每个输入行以键(整数)开始,然后是127个浮点数,这是一个具有127个维度的单个向量,即每一行以键和向量开始。
from cStringIO import StringIO
class testing:
def __str__(self):
file_str = StringIO()
for n in self.vector:
file_str.write(str(n))
file_str.write(" ")
return file_str.getvalue()
def __init__(self,txt="",initial=False):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i<128:
self.vector[i]=float(n)
i = i+1
continue
self.filename=n
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a
def InitializeAndReturnPair(string,first=False):
vec = testing(string,first)
return 1,vec
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()
input.txt中的示例行
6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 0.0 28.0 4.0 2.0 9.0 1.0 0.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 2.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 11.0 35.0 4.0 2.0 4.0 1.0 3.0 2.0 4.0 0.0 0.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 11.0 2.0 29.0 35.0 6.0 5.0 9.0 4.0 2.0 1.0 3.0 1.0 0.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 0.0 1.0 2.0
下面是我得到的错误。此错误来自代码的最后一行,即
输出。reduceByKey
错误消息-http://pastebin.com/0tqiiJQm
我不确定该如何解决这个问题。我尝试使用一个
封送序列化程序
,但它给出了同样的问题。
------------------------------回答------------------------------------
我从apache用户列表中得到了同样问题的答案。基本上,集群中运行的mapper/reducer没有类定义,我们必须通过在另一个模块中编写类并在使用
sc.addPyFile(os.path( HOMEDirectory + "module.py"))
谢谢大家帮助我。
你可以使用与火花工作得很好的Numpy数组。
import numpy as np
def row_map(text):
split_text = text.split()
# create numpy array from elements besides the first element
# which is the key
return split_text(0), np.array([float(v) for v in split_text[1:]])
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(row_map).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : np.add(a,b)).collect()
更多的简洁和蟒蛇。
每次使用cassandra connector在spark中运行scala程序时都会出现此错误 这是我的程序
我试过在Spark中构建包,它会抛出以下错误。命令:sbt包 hduser@hduser-virtualbox:/usr/local/spark-1.1.0-bin-hadoop1/project$cat>simple.sbt name:=“简单项目” scalaVersion:=“2.9.2” libraryDependencies+=“org.apache.spark”%“spark-core
我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF
我的spark程序在小数据集上运行良好。(大约400GB)但是当我将其扩展到大型数据集时。我开始得到错误
我正在使用Apache Spark的示例代码follow文档:https://spark.apache.org/docs/latest/ml-features.html#countvectorizer 但我收到错误消息: 22年10月15日23:04:20信息BlockManagerMaster:使用703.6 MB RAM注册block manager localhost:56882,Block
尝试从/向redshift读/写(s3中的数据)。但在访问数据帧时会出现奇怪的错误。我可以看到正在创建数据帧,并且它能够访问数据,因为它输出表的列名