当前位置: 首页 > 知识库问答 >
问题:

使用Scala Apache Spark的Twitter流行标签

邢焱
2023-03-14

我正在尝试使用apache spark和Scala获得twitter流行标签。我能够打印这些标签,但是当我开始使用reduce函数计算这些标签时,我得到了以下错误

Network.ConnectionManager:选择器线程中断!

import java.io._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._

object TwitterPopularTags {

  def main(args: Array[String]) {


    val (master, filters) = (args(0), args.slice(5, args.length))

    // Twitter Authentication credentials
    System.setProperty("twitter4j.oauth.consumerKey", "****")
    System.setProperty("twitter4j.oauth.consumerSecret","****")
    System.setProperty("twitter4j.oauth.accessToken", "****")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "****")


    val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(10),
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

    val tweets = TwitterUtils.createStream(ssc, None)

    val statuses = tweets.map(status => status.getText())

    val words = statuses.flatMap(status => status.split(" "))
        val hashTags = words.filter(word => word.startsWith("#"))


     val counts = hashTags.map(tag => (tag, 1))
                         .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(10))

    counts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

共有1个答案

杭令
2023-03-14

您使用的是reduceByKeyandWindow,这将强制您在Spark中激活检查点。您可以在这里查看如何执行此一行操作

 类似资料:
  • 我对apache Spark是新手。我试图运行https://github.com/prabeesh/sparktwitteranalysis/tree/0.2.0示例,但控制台给出了以下错误: 我已经使用启动了服务器,并通过sbt/sbt包编译了代码 并使用/删除了代码 这个错误的原因是什么?如何解决它 提前谢了。

  • 有人能帮我解决这个问题吗? 谢谢:)

  • 我正在使用twitter流API从用户那里获取推文。我有用户Id列表如何从TwitterStream API获取推文列表。我分析了以下样本 https://github.com/yusuke/twitter4j/blob/master/twitter4j-examples/src/main/java/twitter4j/examples/stream/PrintUserStream.java 但它

  • Twitter标准库 Twitter最重要的标准库是Util 和 Finagle。Util 可以理解为Scala和Java的标准库扩展,提供了标准库中没有的功能或已有功能的更合适的实现。Finagle 是我们的RPC系统,核心分布式系统组件。 Futures已经在并发一节中简单讨论过。它是调异步处理的中心机制,渗透在我们代码库中,也是Finagle的核心。Futures允许组合并发事件,简化了高并

  • 通常,当我使用twitter流api时,我可以直接从以下位置访问标签: 推特- 当用tweepy搜索关于关键词/标签的推文时,它会下载一个<code> 当我搜索实体/主题标签时,我在作者下找到了第一个(我寻找的)。 推特- 这很奇怪。 “标签”位于 推特- 看起来像这样: 当我试图从 推特- 在循环中: 结果:hashtags是一个空字符串.. 使用时: 推特- 生成此错误: 我记得我最后一次和最

  • 问题内容: 使用Twitter4j提供的代码示例,我想在收集到1000个状态列表后停止流,并返回此列表。我怎样才能做到这一点? 问题答案: 将异步代码强制进入同步模式不是一个好主意。换句话说:使您的check方法也异步,传递一个回调参数而不是返回一个值;并将回调传递给execRequest(只需s将对每个分配的调用替换为该回调)。正如Jai所建议的那样,您不一定需要在应用程序范围的事件总线上触发事