我们使用的是基于Spark Streaming接收器的方法,我们刚刚启用了检查指向来解决数据丢失问题。
火花版本是1.6.1
,我们正在接收来自Kafka主题的消息。
我在内部使用了ssc
,foreachRDD
方法DStream
,所以它抛出了不可序列化的异常。
我试图扩展可序列化的类,但仍然是相同的错误。只有当我们启用检查点时,才会发生这种情况。
def main(args: Array[String]): Unit = {
val checkPointLocation = "/path/to/wal"
val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
ssc.start()
ssc.awaitTermination()
}
def createContext (checkPointLocation: String): StreamingContext ={
val sparkConf = new SparkConf().setAppName("Test")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(40))
ssc.checkpoint(checkPointLocation)
val sc = ssc.sparkContext
val sqlContext: SQLContext = new HiveContext(sc)
val kafkaParams = Map("group.id" -> groupId,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"metadata.broker.list" -> brokerList,
"zookeeper.connect" -> zookeeperURL)
val dStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
dStream.foreachRDD(rdd =>
{
// using sparkContext / sqlContext to do any operation throws error.
// convert RDD[String] to RDD[Row]
//Create Schema for the RDD.
sqlContext.createDataFrame(rdd, schema)
})
ssc
}
错误日志:
2017-02-08 22:53:53250错误[驱动程序]流媒体。StreamingContext:启动上下文时出错,将其标记为已停止的java。伊奥。NotSerializableException:已启用数据流检查点,但具有其功能的数据流不可序列化。阿帕奇。火花SparkContext序列化堆栈:-对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark)。SparkContext@1c5e3677)-field(class:com.x.payments.RemedyDriver$$anonfun$main$1,name:sc$1,type:class org.apache.spark.SparkContext)-object(class:com.x.payments.RemedyDriver$$anonfun$main$1,)-field(class:org.apache.spark.streaming.dstream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,name:cleanedF$1,type:interface scala.Function1)-object(类org.apache.spark.streaming.dstream.dstream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,)-writeObject数据(类:org.apache.spark.stream.dstream.dstream)-object(类org.apache.spark.streaming.dstream.ForEachDStream,org.apache.spark.stream.dstream.dstream)。ForEachDStream@68866c5)-数组元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer)(org.apache.spark.streaming.dstream)。ForEachDStream@68866c5))-writeObject数据(类:org.apache.spark.streaming.dstream.dstream checkpointdata)-对象(类org.apache.spark.stream.dstream.dstream checkpointdata,[0个检查点文件
])-写入对象数据(类:org.apache.spark.streaming.dstream.DStream)-对象(类org.apache.spark.streaming.kafka.KafkaInputDStream,org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)-数组元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32))-写入对象数据(类:org.apache.spark.streaming.DStreamGough)-对象(类org.apache.spark.streaming.DStreamGough,org.apache.spark.streaming.DStreamGraph@6935641e)-字段(类:org.apache.spark.streaming.Checkpoint,名称:图,类型:类org.apache.spark.streaming.DStreamGough)-对象(类org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming.Checkpoint@484bf033)在org.apache.spark.streaming.StreamingContext.validate(StreamingContexts. scala:557)在org. apache. spark. stream。StreamingContexte. liftedTree11美元(StreamingContexts. scala:601)在org. apache. spak. stream。StreamingContexte. start(StreamingContexte. scala:600)在com. x. pay。RemedyDriver$. main(RemedyDriver. scala:104)在com. x. pay。RemedyDriver. main(RemedyDriver. scala)在sun. reect。NativeomeodAccessorInm. Invoke(NativeomeodAccessorInm. java:62)在sun. reect。Application Master$anon2 Dollars. run(Application. master. scala: 559)2017-02-08 22:53:53,250 ERROR[驱动程序]付款。RemedyDriver$:DStream检查点已启用,但DStreams及其功能不可序列化org. apache. spark。SparkContext序列化堆栈:-对象不可序列化(类:org. apache. spark。SparkContext,值:org.apache.spark.SparkContext@1c5e3677)-字段(类:com. x. Payments。RemedyDriver$anonfun$main 1美元,名称:sc1美元,类型:类org. apache. sparkContext)-对象(类com. x. Payments。RemedyDriver$anonfun$main 1美元,)-字段(类:org. apache. spark. Streing. dstream。DStream$anonfun$ForeachRDD1美元$anonfun$应用$mcV$sp3美元,名称:清洁F1DStream$$anonfun$ForeachRDD1美元$$anonfun$申请$mcV$sp3美元,)-写入对象数据(类:org. apache. sak. Streing. dstream. DStream)-对象(类org. apache. spark. Streing. dstream. ForEachDStream,org.apache.spark.streaming.dstream.ForEachDStream@68866c5)-数组元素(索引:0)-数组(类[Ljava. lang. Object;,大小16)-字段(类:Scala.集合. Mutable. ArrayBuffer,名称:数组,类型:类[Ljava. lang. Object;)-对象(类Scala.集合. Mutable. ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5))-写入对象数据(类:org. apache. spark. Streing. dStreamCheckpoint Data)-对象(类org. apache. spark. Streing. dstream. DStreamCheckpoint Data,[0检查点文件
])-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类:org.apache.spark.streaming.kafka.kafkainputdtream,org.apache.spark.streaming.kafka)。KafkaInputDStream@acd8e32)-数组元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer)(org.apache.spark.streaming.kafka)。KafkaInputDStream@acd8e32))-writeObject数据(类:org.apache.spark.streaming.DStreamGraph)-对象(类org.apache.spark.streaming.DStreamGraph,org.apache.spark.streaming)。DStreamGraph@6935641e)-字段(类:org.apache.spark.streaming.Checkpoint,名称:graph,类型:class org.apache.spark.streaming.DStreamGraph)-对象(类:org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming)。Checkpoint@484bf033)2017-02-08 22:53:53255信息[司机]纱线。应用程序管理员:最终应用程序状态:成功,退出代码:0
更新:
基本上,我们试图做的是,将rdd转换为DF(数据流的内部foreachRDD方法),然后在此基础上应用DF API,最后将数据存储在Cassandra中。所以我们使用sqlContext将rdd转换为DF,这一次它会抛出错误。
如果要访问SparkContext
,请通过rdd
值:
dStream.foreachRDD(rdd => {
val sqlContext = new HiveContext(rdd.context)
val dataFrameSchema = sqlContext.createDataFrame(rdd, schema)
}
这是:
dStream.foreachRDD(rdd => {
// using sparkContext / sqlContext to do any operation throws error.
val numRDD = sc.parallelize(1 to 10, 2)
log.info("NUM RDD COUNT:"+numRDD.count())
}
导致SparkContext
在闭包中被序列化,这是不可能的,因为它是不可序列化的。
问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序
我有一个小的scala代码snipet: 当我在windows或mac上运行它时,它会打印出一个巨大的异常信息: 这是什么错误?我该如何修复它?我不太明白。谢谢!
我目前有一个RESTfulWebService跑步服。我最近添加了一个过滤器,它可以执行一些auth操作,并且可以在happy path的情况下工作。但是,当我需要从这个过滤器中抛出一个错误时,它不会将异常序列化为一个漂亮的json字符串,而是抛出一个500,错误如下: 问题是,我不想在应用程序/八位字节流中写入任何内容。我的服务只使用。这在我的实际类中不是问题,我可以在这些类中指定注释。从资源主
我正在开发一个应用程序,它使用Gson作为JSON反序列化器,需要从REST API反序列化多态JSON。在解释mi问题之前,请注意,我已经用Gson研究了多态反序列化,并在几个案例中成功地实现了它。这是我面临的一个具体问题。在问这个问题之前,我也读过这篇很棒的帖子和关于堆栈溢出的讨论。顺便说一下,我正在使用RuntimeTypeAdapterFactory来反序列化多态对象。 我遇到的问题是,G
问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列