异常消息如下
我的代码如下:
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }
object DateTimeNullReferenceReappear extends App {
case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0)
val cfg = new Configuration
val sparkConf = new SparkConf()
sparkConf.setAppName("bourne_exception_reappear")
val sc = new SparkContext(sparkConf)
val data = TDWSparkContext.tdwTable( // this function just read data from an data warehouse
sc,
tdwuser = FaceConf.TDW_USER,
tdwpasswd = FaceConf.TDW_PASSWORD,
dbName = "my_db",
tblName = "my_table",
parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
.map(row => {
Record(uin = row(2),
date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
value = row(4).toDouble)
}).map(x => (x.uin, (x.date, x.value)))
.groupByKey
.map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
// val data = TDWSparkContext.tdwTable( // It works, as I don't user datetime toString in the groupBy
// sc,
// tdwuser = FaceConf.TDW_USER,
// tdwpasswd = FaceConf.TDW_PASSWORD,
// dbName = "hy",
// tblName = "t_dw_cf_oss_tblogin",
// parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
// .map(row => {
// Record(uin = row(2),
// date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
// value = row(4).toDouble)
// }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
// .groupByKey
// .map(x => {
// x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
// })
data.take(10).map(println)
}
您需要禁用Kryo,使用Kryo JodaTime序列化器,或者避免序列化DateTime对象,即传递long。
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的
我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?
我一直在youtube上学习android的自定义视图等等。在我尝试做一个表面视图(相当简单的描述在这个视频)。 我所做的事情与视频中所显示的几乎是同步的。
本质上,我想对dStream中的每个元素应用一组函数。目前,我正在为pyspark.streaming.dstream使用“map”函数。根据文档,我的方法似乎是正确的。http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.dstream map(f,preservesPart
一个函数总是占用一段连续的内存区域,函数名在表达式中有时也会被转换为该函数所在内存区域的首地址,这和数组名非常类似。我们可以把函数的这个首地址(或称入口地址)赋予一个 指针变量,使指针变量指向函数所在的内存区域,然后通过指针变量就可以找到并调用该函数。这种指针就是 函数指针。 函数指针的定义形式为: returnType (*pointerName)(param list); returnType
问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择: