import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import twitter4j.Status
object TrendingHashTags {
def main(args: Array[String]): Unit = {
if (args.length < 8) {
System.err.println("Usage: TrendingHashTags <consumer key> <consumer secret> " +
"<access token> <access token secret> " +
"<language> <batch interval> <min-threshold> <show-count> " +
"[<filters>]")
System.exit(1)
}
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret,
lang, batchInterval, minThreshold, showCount ) = args.take(8)
val filters = args.takeRight(args.length - 8)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val conf = new SparkConf().setMaster(("local[4]")).setAppName("TrendingHashTags")
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
ssc.checkpoint("checkpoint")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val tweetsFilteredByLang = tweets.filter{tweet => tweet.getLang() == lang}
val statuses = tweetsFilteredByLang.map{tweet => tweet.getText()}
val words = statuses.flatMap{status => status.split("""\s+""")}
val hashTags = words.filter{word => word.startsWith("#")}
val hashTagPairs = hashTags.map{hashtag => (hashtag, 1)}
val tagsWithCounts = hashTagPairs.updateStateByKey(
(counts: Seq[Int], prevCount: Option[Int]) =>
prevCount.map{c => c + counts.sum}.orElse{Some(counts.sum)}
)
val topHashTags = tagsWithCounts.filter { case (t, c) =>
c > minThreshold.toInt
}
val sortedTopHashTags = topHashTags.transform{rdd =>
rdd.sortBy({case(w, c) => c}, false)
}
sortedTopHashTags.print(showCount.toInt)
ssc.start()
ssc.awaitTermination()
}
}
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
at TrendingHashTags$.main(TrendingHashTags.scala:28)
at TrendingHashTags.main(TrendingHashTags.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
以下是我的构建。SBT内容:
name := "sparkStreaming"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.4.5",
"org.apache.spark" %% "spark-sql" % "2.4.5",
"org.apache.spark" %% "spark-streaming" % "2.4.5" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.6.3")
清楚的迹象是,有些地方你在内部使用较低版本的Spark...(spark 1.5可能是)
sbt inspect tree clean
你可以用这个检查一下。对于maven用户MVN depdency:tree
将给出所有使用的依赖项列表
还有一件事是你在使用
"org.apache.spark" %% "spark-streaming" % "2.4.5" % "provided",
Apache Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。 Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Sca
问题内容: 我总是遇到以下错误。有人可以帮我吗? 当我编译以下代码时。我已经在网上搜索了,但没有找到解决方案。添加saveToCassandra时出现错误。 我的pom是以下内容。 问题答案: org.apache.spark.Logging在Spark版本1.5.2或更低版本中可用。它不在2.0.0中。请更改版本如下
我试图用火花流在推特上找到流行的标签。 twitter4j-stream-4.0.4.jar Spark-Streaming-Twitter2.10-1.0.0.jar 在代码上,齐柏林飞船工作得很好 任何帮助都将不胜感激! 谢谢。
本文向大家介绍apache-spark PairDStreamFunctions.updateStateByKey,包括了apache-spark PairDStreamFunctions.updateStateByKey的使用技巧和注意事项,需要的朋友参考一下 示例 updateState按键可以用于DStream基于即将到来的数据创建有状态。它需要一个功能: 其中采用一系列current值,即
本文向大家介绍apache-spark PairDStreamFunctions.mapWithState,包括了apache-spark PairDStreamFunctions.mapWithState的使用技巧和注意事项,需要的朋友参考一下 示例 mapWithState与相似updateState,可用于根据即将到来的数据创建有状态DStream。它要求StateSpec: 它接受key
问题内容: 我正在尝试使用java作为编程语言来了解Spark中的工作。 说我有一句话“我就是我”。我把句子分解成单词并将其存储为列表。 现在,此函数将分配给每个单词: 所以输出是这样的: 现在,如果我有3个reducer运行,则每个reducer将获得一个键和与该键关联的值: 我想知道 一个。在下面的函数中到底发生了什么。 b。参数是什么 c。基本上,JavaPairRDD是如何形成的。 问题答