我试图在cloudera vm上使用pyspark和hive创建一个数据帧,但每次都出现这个错误。
编辑2-sc=SparkContext(appname=“pythonsentimentanalysis”)sqlCtx=HiveContext(sc)
filenameAFINN = "/home/cloudera/Desktop/TwitterSentimentAnalysis/AFINN/AFINN-111.txt"
afinn = dict(map(lambda (w, s): (w, int(s)), [ ws.strip().split('\t') for ws in open(filenameAFINN) ]))
filenameCandidate = "file:///home/cloudera/Desktop/TwitterSentimentAnalysis/Candidates/Candidate Mapping.txt"
candidates = sc.textFile(filenameCandidate).map(lambda x: (x.strip().split(",")[0],x.strip().split(","))) \
.flatMapValues(lambda x:x).map(lambda y: (y[1],y[0])).distinct()
pattern_split = re.compile(r"\W+")
tweets = sqlCtx.sql("select id, text, entities.user_mentions.name from incremental_tweets")
def sentiment(text):
words = pattern_split.split(text.lower())
sentiments = map(lambda word: afinn.get(word, 0), words)
if sentiments:
sentiment = float(sum(sentiments))/math.sqrt(len(sentiments))
else:
sentiment = 0
return sentiment
sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name]) \
.map(lambda r: [sentiment(r[1]),r[2]]) \
.flatMapValues(lambda x: x) \
.map(lambda y: (y[1],y[0])) \
.reduceByKey(lambda x, y: x+y) \
.sortByKey(ascending=True)
scoreDF = sentimentTuple.join(candidates) \
.map(lambda (x,y): (y[1],y[0])) \
.reduceByKey(lambda a,b: a+b) \
.toDF()
scoreRenameDF = scoreDF.withColumnRenamed("_1","Candidate").withColumnRenamed("_2","Score")
sqlCtx.registerDataFrameAsTable(scoreRenameDF, "SCORE_TEMP")
sqlCtx.sql("INSERT OVERWRITE TABLE candidate_score SELECT Candidate, Score FROM SCORE_TEMP")
如果正确创建了中间RDD,请使用下面的代码检查它们:
for i in rdd.take(10): print(i)
这将显示RDD的前10个条目
我对Spark和Scala相对较新。 我从以下数据帧开始(由密集的双倍向量组成的单列): 直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]: 有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。
现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。 但在此之前,让我们了解Spark - RDD中的一个基本概念。 RDD代表Resilient Distributed Dataset ,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。 RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。 RDD也具有容错能力,因
我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数
我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不
我尝试创建一个JavaRDD,其中包含另一系列RDD。 RDD机器。foreach(机器- 第一:有没有可能这样做?如果没有,我可以用什么方式尝试做一些不同的事情? 让我展示一下我尝试做的事情: 我尝试在每台机器上启动我的算法,这台机器必须从Elasticsearch中的数据中学习。 因此,我尝试在每个“机器”中获取查询的所有数据。我的问题是:Spark有可能做到这一点吗?或者以其他方式?当我点燃
我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext