当前位置: 首页 > 知识库问答 >
问题:

在apache spark streaming中使用foreachRDD内部的db连接

巫马安怡
2023-03-14

在spark streaming中,我希望在处理每个批处理之前查询db,将结果存储在一个可以序列化并通过网络发送给执行者的hashmap中。

class ExecutingClass implements Serializable {
 init(DB db) {

   try(JavaStreamingContext jsc = new JavaStreamingContext(...)) {

   JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);

   kafkaStream.foreachRDD(rdd -> {
   // this part is supposed to execute in the driver
  Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map

  JavaRDD<String> results = processRDD(rdd, indexMap);

  ...  

 }


  }
    JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd,       Map<String,String> indexMap) {
 ... 
    }
    }

在上面的代码中,indexMap应该在驱动程序中初始化,得到的map用于处理RDD。在foreachRDD闭包外部声明indexMap时没有问题,但在内部声明时会出现序列化错误。这是什么原因呢?

我之所以要这样做,是为了确保每个批处理都有数据库中的最新值。我怀疑这是由于foreachRDD的闭包试图序列化闭包之外的一切。

共有1个答案

冷浩瀚
2023-03-14

您正在使用forEachRdd中的db对象(它是db的实例),因此spark尝试序列化db,为了避免这种情况,我们需要在forEachRdd中创建db连接(或者),您可以使用对象池,如以下文章http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/中所讨论的

 类似资料:
  • 我有一个关于DO的小插曲,其中有一个MONGODB实例。我可以通过“ssh”连接user@ip命令,但如果我想将其与Meteor(本地)连接,我需要使用“ssh-L port:localhost:portserver”进行端口转发user@ip-f-N“并使用带有端口的launch Meteor。要访问MONGODB,我删除了密码,因此您只能使用ssh进行访问,并按照DO指南上的建议启用了防火墙。

  • 我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?

  • 我在使用foreachRDD进行CSV数据处理时遇到异常。这是我的代码 我得到以下错误。伊奥。NotSerializableException:已启用数据流检查点,但具有其功能的数据流不可序列化。阿帕奇。火花流动。StreamingContext序列化堆栈:-对象不可序列化(类:org.apache.spark.streaming.StreamingContext,值:org.apache.spa

  • 我对MySQL和PHP非常陌生,我正在努力处理两个表之间的内部连接。我正在构建一个脚本,它读取一个os商务数据库,并告诉我哪些产品当前处于过期订单状态。为了使产品处于过期订单状态,products_attributes表中的值设置为'134',但是它只读取product_id,而不读取'products'表中的product_model。 我想在products_attributes表中选择值为'

  • 如何在临时文件中获取名称以便在此查询中设置? 谢谢。

  • 问题内容: 有人知道Django有多“模块化”吗?我是否可以仅使用ORM部分来获取映射到数据库表的类并知道如何从这些表中进行读取/写入? 如果没有,您会推荐什么作为“相当于Hibernate的Python”? 问题答案: 如果你喜欢Django的ORM,则“独立”使用它非常简单;我已经写了几种在Web上下文之外使用Django部分的技术,你可以自由使用其中的任何一种(或滚动使用)。 上面的Shan