当前位置: 首页 > 知识库问答 >
问题:

Spark流媒体-过滤带有地理定位的流媒体后的推文

许展鹏
2023-03-14

我是一个初学者,试图使用spark streaming获得推文,使用Scala和一些过滤器关键字。是否有可能在流媒体之后只过滤那些没有地理定位为Null的推文?我正在尝试保存ElasticSearch中的推文。所以,在将tweet地图保存到ElasticSearch之前,我可以过滤那些带有地理定位信息的地图,然后保存它们吗?我正在使用json4s.jsondsl和tweet中的字段创建JSON。这是示例代码

val stream=twitterutils.createstream(ssc,None,filters)val tweetMap=stream.map(status=>{val tweetMap=

      ("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~
      ("UserLang" -> status.getUser.getLang) ~
      ("UserLocation" -> Option(status.getUser.getLocation)) ~
      ("UserName" -> status.getUser.getName) ~
      ("Text" -> status.getText) ~
      ("TextLength" -> status.getText.length) ~
      //Tokenized the tweet message and then filtered only words starting with #
      ("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~
      ("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"}))
// Each batch is saved to Elasticsearch 
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) }

//在这一步之前,有没有办法过滤掉“位置”为空的tweets?

我引用了来自GitHub的代码:https://github.com/luvgupta008/screamingtwitter/blob/master/src/main/scala/com/spark/streaming/twitterprismer.scala

共有1个答案

堵浩波
2023-03-14

查看RDD上的filter方法。接受谓词函数(a:a)=>Boolean。如果返回值为true,则将该元素添加到列表中。如果为false,则不会将该元素添加到列表中。

tweetMap.filter(
  status => Option(status.getGeoLocation) match {
    case Some(_) => true
    case None => false
  })
 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • 我正在尝试从Spark官方网站运行Spark Streaming示例 这些是我在pom文件中使用的依赖项: 这是我的Java代码: 当我尝试从Eclipse运行它时,我遇到以下异常: 我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢

  • Streaming API用于通过令牌读取JSON令牌。 它将JSON内容读写为离散事件。 JsonReader和JsonWriter将数据读/写为令牌,称为JsonToken 。 它是处理JSON的三种方法中最强大的方法。 它具有最低的开销,并且在读/写操作中非常快。 它类似于XML的Stax解析器。 在本章中,我们将展示使用GSON流API来读取JSON数据。 Streaming API与to

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

  • 收听电台广播的流媒体直播,还可以录制广播。 作者说:有问题欢迎和我QQ信箱交流:10040142@qq.com [Code4App.com]