当前位置: 首页 > 知识库问答 >
问题:

数据流检查点已启用,但数据流及其功能不可序列化

勾海超
2023-03-14

我想把DStream发送到Kafka,但它仍然不起作用。

searchWordCountsDStream.foreachRDD(rdd =>
rdd.foreachPartition(

    partitionOfRecords =>
    {
      val props = new HashMap[String, Object]()

      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outbroker)

      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String,String](props)

      partitionOfRecords.foreach
      {
        case (x:String,y:String)=>{
          println(x)
          val message=new ProducerRecord[String, String](outtopic,null,x)
          producer.send(message)
        }
      }
      producer.close()
    })
)

以下是一些错误信息:

16/10/31 14:44:15错误StreamingContext:错误启动上下文,将其标记为停止java.io.NotSerializableException:DStream检查点已启用,但DStreams及其功能不可序列化spider.app.job.MeetMonitor序列化堆栈:-对象不可序列化(类:spider.app.job.MeetMonitor,值:spider.app.job.MeetMonitor@433c6abb)-字段(类:spider.app.job.MeetMonitor$$anonfun$createContext2美元,名称:$外,类型:类spider.app.job.MeetMonitor)-对象(类spider.app.job.MeetMonitor$anonfun$createContext2美元,)-字段(类:org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD1美元$anonfun$应用$mcV$sp3美元,名称:清洁F1美元,类型:接口scala。函数1)-对象(类org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD1美元$$anonfun$应用$mcV$sp3美元,)-写对象数据(类:org.apache.spark.streaming.dstream.DStream)-对象(类org.apache.spark.streaming.dstream.ForEachDStream,org.apache.spark.streaming.dstream.ForEachDStream@3ac3f6f)-写对象数据(类:org.apache.spark.streaming.dstream.DStreamCheckpoint Data)-对象(类org.apache.spark.streaming.dstream.DStreamCheckpoint Data,[0检查点文件

])-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类:org.apache.spark.streaming.dstream.ForEachDStream,org.apache.spark.streaming.dstream)。ForEachDStream@6f9c5048)-数组元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer)(org.apache.spark.streaming.dstream)。ForEachDStream@6f9c5048,org。阿帕奇。火花流动。数据流。ForEachDStream@3ac3f6f))-writeObject数据(类:org.apache.spark.streaming.dstream.dstream checkpointdata)-对象(类org.apache.spark.stream.dstream.dstream checkpointdata,[0个检查点文件

])

共有2个答案

朱鹏
2023-03-14

我一直在使用Spark 2.3.0版本,遇到了同样的问题,我只是通过为它抛出错误的类实现Serializable接口解决了这个问题。

你的情况是蜘蛛。应用程序。工作MeetMonitor应该像这样实现它:。

公共类MeetMonitor实现可序列化{

// ... }

另一件事是,如果您在类中使用Logger,请注意它的实例也不可序列化,因此可能会导致相同的问题。这也可以通过将其定义为:

私有静态最终记录器记录器=Logger.get记录器(. class);

穆浩皛
2023-03-14

我遇到了同样的问题在这里找到了答案

https://forums.databricks.com/questions/382/why-is-my-spark-streaming-application-throwing-a-n.html

似乎将检查点与foreachRDD一起使用会导致问题。删除代码中的检查点后,一切正常。

P/S.我只是想发表评论,但我没有足够的声誉这么做。

 类似资料:
  • 我试图将PipelineOptions接口传递给dataflow DoFn,以便DoFn可以配置一些它需要重新实例化的不可序列化的东西,但是当我告诉dataflow保存我的PipelineOptions子类的实例时,它似乎无法序列化DoFn。我需要对Options接口做什么才能使其正确序列化吗? DoFn定义 选项未标记时的序列化异常

  • 我有一个关于Ignite流媒体部分的问题。 我所理解的是,这是一种将数据导入缓存的方式,但我也看到,我们可以配置流接收器来应用一些其他的自定义逻辑。 所以我尝试创建一个包含接收器的类和一个将数据注入流的类(因此在服务器模式下有2个main和2个Ignite实例),但我“只是”将数据放入流的缓存中(接收器中没有任何自定义逻辑处理)。所以,我在问我是不是错过了什么,或者是我不太理解什么是流到点燃。 如

  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 我正在使用以下设置: 我的记录大小大约是2000字节。并查看“Grid-Data-Loader-Flusher”线程状态,如下所示: 线程数平均最长持续时间网格-数据-加载器-冲洗器-#100 38 4,737,793.579 30,427,862 180,036,156 数据流的最佳配置是什么? 谢谢

  • 严格的单向数据流是 Redux 架构的设计核心。 这意味着应用中所有的数据都遵循相同的生命周期,这样可以让应用变得更加可预测且容易理解。同时也鼓励做数据范式化,这样可以避免使用多个且独立的无法相互引用的重复数据。 如果这些理由还不足以令你信服,读一下 动机 和 Flux 案例,这里面有更加详细的单向数据流优势分析。虽然 Redux 不是严格意义上的 Flux,但它们有共同的设计思想。 Redux

  • 有时,您希望发送非常巨量的数据到客户端,远远超过您可以保存在内存中的量。 在您实时地产生这些数据时,如何才能直接把他发送给客户端,而不需要在文件 系统中中转呢? 答案是生成器和 Direct Response。 基本使用 下面是一个简单的视图函数,这一视图函数实时生成大量的 CSV 数据, 这一技巧使用了一个内部函数,这一函数使用生成器来生成数据,并且 稍后激发这个生成器函数时,把返回值传递给一个