当前位置: 首页 > 知识库问答 >
问题:

joda DateTime格式导致火花RDD函数中的空指针错误

秋阳旭
2023-03-14

异常消息如下

我的代码如下:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }




object DateTimeNullReferenceReappear extends App {

  case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

  val cfg = new Configuration
  val sparkConf = new SparkConf()
  sparkConf.setAppName("bourne_exception_reappear")
  val sc = new SparkContext(sparkConf)

val data = TDWSparkContext.tdwTable(   // this function just read data from an data warehouse
  sc,
  tdwuser = FaceConf.TDW_USER,
  tdwpasswd = FaceConf.TDW_PASSWORD,
  dbName = "my_db",
  tblName = "my_table",
  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
  .map(row => {
    Record(uin = row(2),
      date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
      value = row(4).toDouble)
  }).map(x => (x.uin, (x.date, x.value)))
  .groupByKey
  .map(x => {
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
  })

//      val data = TDWSparkContext.tdwTable(  // It works, as I don't user datetime toString in the groupBy 
//      sc,
//      tdwuser = FaceConf.TDW_USER,
//      tdwpasswd = FaceConf.TDW_PASSWORD,
//      dbName = "hy",
//      tblName = "t_dw_cf_oss_tblogin",
//      parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
//      .map(row => {
//        Record(uin = row(2),
//          date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
//          value = row(4).toDouble)
//      }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
//      .groupByKey
//      .map(x => {
//        x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
//      })

  data.take(10).map(println)

}

共有1个答案

景成和
2023-03-14

您需要禁用Kryo,使用Kryo JodaTime序列化器,或者避免序列化DateTime对象,即传递long。

 类似资料:
  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?

  • 我一直在youtube上学习android的自定义视图等等。在我尝试做一个表面视图(相当简单的描述在这个视频)。 我所做的事情与视频中所显示的几乎是同步的。

  • 本质上,我想对dStream中的每个元素应用一组函数。目前,我正在为pyspark.streaming.dstream使用“map”函数。根据文档,我的方法似乎是正确的。http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.dstream map(f,preservesPart

  • 一个函数总是占用一段连续的内存区域,函数名在表达式中有时也会被转换为该函数所在内存区域的首地址,这和数组名非常类似。我们可以把函数的这个首地址(或称入口地址)赋予一个 指针变量,使指针变量指向函数所在的内存区域,然后通过指针变量就可以找到并调用该函数。这种指针就是 函数指针。 函数指针的定义形式为: returnType (*pointerName)(param list); returnType

  • 问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择: