当前位置: 首页 > 知识库问答 >
问题:

Cloudera spark,RDD为空

松亦
2023-03-14

我试图在cloudera vm上使用pyspark和hive创建一个数据帧,但每次都出现这个错误。

编辑2-sc=SparkContext(appname=“pythonsentimentanalysis”)sqlCtx=HiveContext(sc)

filenameAFINN = "/home/cloudera/Desktop/TwitterSentimentAnalysis/AFINN/AFINN-111.txt"

 afinn = dict(map(lambda (w, s): (w, int(s)), [ ws.strip().split('\t') for ws in open(filenameAFINN) ]))

filenameCandidate = "file:///home/cloudera/Desktop/TwitterSentimentAnalysis/Candidates/Candidate Mapping.txt"

candidates = sc.textFile(filenameCandidate).map(lambda x: (x.strip().split(",")[0],x.strip().split(","))) \
                   .flatMapValues(lambda x:x).map(lambda y: (y[1],y[0])).distinct()


pattern_split = re.compile(r"\W+")

tweets = sqlCtx.sql("select id, text, entities.user_mentions.name from incremental_tweets")

def sentiment(text):
  words = pattern_split.split(text.lower())
  sentiments = map(lambda word: afinn.get(word, 0), words)
  if sentiments:
   sentiment = float(sum(sentiments))/math.sqrt(len(sentiments))
  else:
   sentiment = 0
   return sentiment

   sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name]) \
           .map(lambda r: [sentiment(r[1]),r[2]]) \
           .flatMapValues(lambda x: x) \
           .map(lambda y: (y[1],y[0])) \
           .reduceByKey(lambda x, y: x+y) \
           .sortByKey(ascending=True)

  scoreDF = sentimentTuple.join(candidates) \
        .map(lambda (x,y): (y[1],y[0])) \
        .reduceByKey(lambda a,b: a+b) \
        .toDF()

   scoreRenameDF =  scoreDF.withColumnRenamed("_1","Candidate").withColumnRenamed("_2","Score")

   sqlCtx.registerDataFrameAsTable(scoreRenameDF, "SCORE_TEMP")

   sqlCtx.sql("INSERT OVERWRITE TABLE candidate_score SELECT Candidate, Score FROM SCORE_TEMP")

共有1个答案

钱黎明
2023-03-14

如果正确创建了中间RDD,请使用下面的代码检查它们:

for i in rdd.take(10):   print(i)

这将显示RDD的前10个条目

 类似资料:
  • 我对Spark和Scala相对较新。 我从以下数据帧开始(由密集的双倍向量组成的单列): 直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]: 有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。

  • RDD

    现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。 但在此之前,让我们了解Spark - RDD中的一个基本概念。 RDD代表Resilient Distributed Dataset ,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。 RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。 RDD也具有容错能力,因

  • 我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数

  • 我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不

  • 我尝试创建一个JavaRDD,其中包含另一系列RDD。 RDD机器。foreach(机器- 第一:有没有可能这样做?如果没有,我可以用什么方式尝试做一些不同的事情? 让我展示一下我尝试做的事情: 我尝试在每台机器上启动我的算法,这台机器必须从Elasticsearch中的数据中学习。 因此,我尝试在每个“机器”中获取查询的所有数据。我的问题是:Spark有可能做到这一点吗?或者以其他方式?当我点燃

  • 我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext