val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines = ssc.textFileStream("input")
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val dataSet = sqlContext.read.json(rdd)
val accountIds = dataSet.select("accountId").distinct.collect.flatMap(_.toSeq)
val accountIdArry = accountId.map(accountId => dataSet.where($"accountId" <=> accountId))
accountIdArry.foreach { arrEle =>
print(arrEle.count)
arrEle.show
arrEle.write.format("json").save("output")
}
}
}
org.apache.spark.SparkException: Task not serializable
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import play.api.libs.json._
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream._
object SparkExample {
def main(inputDir: String) {
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines: DStream[String] = ssc.textFileStream(inputDir)
val jsonLines = lines.map[JsValue](l => Json.parse(l))
val accountIdLines = jsonLines.map[(String, JsValue)](json => {
val accountId = (json \ "accountId").as[String]
(accountId, json)
})
val accountIdCounts = accountIdLines
.map[(String, Long)]({ case (accountId, json) => {
(accountId, 1)
} })
.reduceByKey((a, b) => a + b)
// this DStream[(String, Long)] will have current accumulated count for accountId's
val updatedAccountCounts = accountIdCounts
.updateStateByKey(updatedCountOfAccounts _)
}
def updatedCountOfAccounts(a: Seq[Long], b: Option[Long]): Option[Long] = {
b.map(i => i + a.sum).orElse(Some(a.sum))
}
}
有两件事你需要记住。
首先,由于您使用的是StreamingContext
带有2秒
微批处理,因此DStreams
将包含RDD,其中只有在这2秒内生成的数据,而不是所有数据。如果您需要对当时所有可用的数据执行操作,那么流就不适合您的问题。
第二,您不需要使用sql上下文来处理JSON。只需使用任何json库并将rdd分组在AccountID
上即可。
import play.api.libs.json._
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val dstreams = ssc.textFileStream("input")
dstreams.foreachRDD { rdd =>
val jsonRdd = rdd.map(l => Json.parse(l))
val grouped = jsonRdd.groupBy(json => (json \ "accountId").as[String])
}
import play.api.libs.json._
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines: DStream[String] = ssc.textFileStream("inputPath")
val jsonLines = lines.map[JsValue](l => Json.parse(l))
val accountIdLines = jsonLines.map[(String, JsValue)](json => {
val accountId = (json \ "accountId").as[String]
(accountId, json)
})
val accounIdCounts = accountIdLines
.map[(String, Long)]({ case (accountId, json) => {
(accountId, 1)
} })
.reduceByKey((a, b) => a + b)
// this DStream[(String, Long)] will have current accumulated count for accountId's
val updatedAccountCounts = accountIdCounts
.updateStateByKey(updateCountOfAccounts _)
def updatedCountOfAccounts(a: Seq[Long], b: Option[Long]): Option[Long] = {
b.map(i => i + a.sum).orElse(Some(a.sum))
}
问题内容: 我正在使用SQL Server2008。我具有下表中的数据: 我想以这种格式获取数据: 我怎样才能做到这一点? 问题答案: 使用PIVOT您可以执行以下操作 产生 查看有效的Data.SE示例 在不支持PIVOT的数据库中,您可以改为对表进行多次联接。尽管您可能还是想这样做,但是正如GBN所指出的那样,因为我们没有使用聚合。 查看有效的Data.SE示例
我正试图找出适合我所尝试的模型的正确语法。这是一个时间序列预测问题,我想在将时间序列输入LSTM之前,使用几个密集层来改进时间序列的表示。 下面是我正在处理的一个虚拟系列: 首先,我将拟合一个前面没有密集层的LSTM。这需要我重塑数据: 这是正确的吗? 这些教程使得单个时间序列的第一维度应该是1,然后是时间步数(1000),然后是协变量数(3)。但当我这样做时,模型就无法编译。 在这里,我编译并训
我在一个Apache Flink项目中遇到了以下情况。 具有不同对象的3个流,例如 Person->字符串id,字符串firstName,字符串lastName(即101,John,Doe) PersonDetail->字符串id,字符串地址,字符串城市,字符串电话号码,long personId(即99,Stefansplatz 1,+43066012345678,101) PersonAddD
我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。
问题内容: 问:我怎样才能从读到的一切入的方式是不是一个手工制作的循环用我自己的字节的缓冲区? 问题答案: 编写一个方法来执行此操作,然后从需要该功能的任何地方调用它。番石榴已经在中提供了代码。我敢肯定,几乎所有其他具有“通用” IO功能的库也都有它,但是Guava是我第一个“入门”库。它震撼了:)
我有一个数据帧: 现在我想把它转换成一个新的数据帧,比如 我怎样才能用熊猫做到这一点?