mapWithState与相似updateState,可用于根据即将到来的数据创建有状态DStream。它要求StateSpec:
import org.apache.spark.streaming._ object StatefulStats { val state = StateSpec.function( (key: String, current: Option[Double], state: State[StatCounter]) => { (current, state.getOption) match { case (Some(x), Some(cnt)) => state.update(cnt.merge(x)) case (Some(x), None) => state.update(StatCounter(x)) case (None, None) => state.update(StatCounter()) case _ => } (key, state.get) } ) }
它接受key key,currentvalue和累计State并返回新状态。全部放在一起:
import org.apache.spark._ import org.apache.spark.streaming.dstream.DStream import scala.collection.mutable.Queue import org.apache.spark.util.StatCounter object MapStateByKeyApp { def main(args: Array[String]) { val sc = new SparkContext("local", "mapWithState", new SparkConf()) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/chk") val queue = Queue( sc.parallelize(Seq(("foo", 5.0), ("bar", 1.0))), sc.parallelize(Seq(("foo", 1.0), ("foo", 99.0))), sc.parallelize(Seq(("bar", 22.0), ("foo", 1.0))), sc.emptyRDD[(String, Double)], sc.parallelize(Seq(("foo", 1.0), ("bar", 1.0))) ) val inputStream: DStream[(String, Double)] = ssc.queueStream(queue) inputStream.mapWithState(StatefulStats.state).print() ssc.start() ssc.awaitTermination() ssc.stop() } }
最终预期输出:
------------------------------------------- Time: 1469923280000 ms ------------------------------------------- (foo,(count: 1, mean: 5.000000, stdev: 0.000000, max: 5.000000, min: 5.000000)) (bar,(count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000)) ------------------------------------------- Time: 1469923290000 ms ------------------------------------------- (foo,(count: 3, mean: 35.000000, stdev: 45.284287, max: 99.000000, min: 1.000000)) (foo,(count: 3, mean: 35.000000, stdev: 45.284287, max: 99.000000, min: 1.000000)) ------------------------------------------- Time: 1469923300000 ms ------------------------------------------- (bar,(count: 2, mean: 11.500000, stdev: 10.500000, max: 22.000000, min: 1.000000)) (foo,(count: 4, mean: 26.500000, stdev: 41.889736, max: 99.000000, min: 1.000000)) ------------------------------------------- Time: 1469923310000 ms ------------------------------------------- ------------------------------------------- Time: 1469923320000 ms ------------------------------------------- (foo,(count: 5, mean: 21.400000, stdev: 38.830916, max: 99.000000, min: 1.000000)) (bar,(count: 3, mean: 8.000000, stdev: 9.899495, max: 22.000000, min: 1.000000))
Apache Spamassassin是一个集成到cPanel中的软件,有助于防止在您的邮箱中收到垃圾邮件。 它有助于防止您的邮箱被垃圾邮件淹没。 启用Apache Spamassassin后,它会按垃圾邮件分数对每封邮件进行评级。 您可以根据垃圾邮件分数选择要自动删除的垃圾邮件。 如果您选择垃圾邮件分数为5的自动垃圾邮件删除,则将自动删除所有垃圾邮件分数为5或更高的电子邮件。 启用或禁用Apac
学习如何在Java编程中使用Tika。 以下是示例 - 如何使用java从PDF中提取内容。 如何使用java从ODF中提取内容。 如何使用java从Excel工作表中提取内容。 如何使用java从文本文档中提取内容。 如何使用java从XML文档中提取内容。 如何使用java从HTML文档中提取内容。 如何使用java从java .class文件中提取内容。
有点问题 如果我使用Apache Ignite进行消息传递和事件处理,是否仍需要使用Kafka 本质上,Kafka会为我提供什么(如果有)Ignite的附加功能 提前感谢
Apache HTTP Server(简称 Apache)是 Apache 软件基金会的一个开放源码的网页服务器,可以在大多数计算机操作系统中运行,由于其多平台和安全性被广泛使用,是最流行的 Web 服务器端软件之一。它快速、可靠并且可通过简单的 API 扩展,将Perl/Python等解释器编译到服务器中。 Apache 起初由伊利诺伊大学香槟分校的国家超级电脑应用中心(NCSA)开发。此后,A
我正在学习如何将kafka与apache camel集成,我遇到了以下错误。我在c:/inbox文件夹中创建了一个文件,并希望使用kafka Consumer使用其中的文本。我使用的是apache Camel3.1.0版本。下面是我的代码 下面是我得到的错误
了解如何在Java编程中使用POI Word。 以下是示例 - 如何使用Java创建空白word文档。 如何使用Java在word文档中编写段落。 如何使用Java将边框应用于word文档中的文本。 如何使用Java将表添加到word文档。 如何使用Java格式化word文档中的文本。 如何使用Java对齐word文档中的文本。
了解如何在Java编程中使用POI Excel。 以下是示例 - 如何使用Java创建空白Excel工作表。 如何使用Java将数据写入Excel工作表。 如何使用Java在电子表格中创建不同类型的单元格。 如何使用Java将不同样式应用于电子表格中的单元格。 如何使用Java将字体应用于单元格的内容。 如何使用Java为单元格中的文本设置方向。 如何使用Java向单元格的内容添加超链接。 如何使
了解如何在Java编程中使用POI PPT。 以下是示例 - 如何使用java创建空白PPT文档。 如何使用java将图像添加到PPT中的幻灯片。 如何使用java在PPT中的幻灯片上创建超链接。 如何使用java格式化PPT中幻灯片上的文本。 如何使用java合并两个PPT。 如何将PPT的幻灯片转换为图像。