当前位置: 首页 > 知识库问答 >
问题:

Spark流式嵌套执行序列化问题

柯昆杰
2023-03-14

我正在尝试连接spark streaming应用程序中的DB2数据库和导致“org.apache.spark.sparkException:Task not Serializable”问题的数据库查询执行语句。请指教。下面是我有的示例代码供参考。

        dataLines.foreachRDD{rdd=>
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

          val dataRows=rdd.map(rs => rs.value).map(row =>
            row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
              , "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
          )

          val db2Conn = getDB2Connection(spark,db2ConParams)

          dataRows.foreach{ case (k,v) =>
              val table = v._4
              val dbQuery = s"(SELECT * FROM $table ) tblResult"
              val df=getTableData(db2Conn,dbQuery)
              df.show(2)
          }
        }


Below is other function code:

  private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
      spark.read.format("jdbc").options(db2ConParams)
  }

  private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
      db2Con.option("dbtable",tableName).load()
  }



object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

下面是错误日志:

共有1个答案

索寒
2023-03-14

理想情况下,您应该保持datarows.foreach中的闭包不包含任何连接对象,因为闭包要序列化到执行程序并在那里运行。这个概念在这个官方链接上有深入的讨论

在您的情况下,行以下是导致问题的关闭:

val df=GetTableData(db2Conn,dbQuery)

除了df.show(2)之外,您没有提到您正在对表数据执行什么操作。如果行很大,那么您可以讨论更多关于您的用例的内容。也许,你需要考虑一个不同的设计

 类似资料:
  • 无法解决以下由)触发的序列化问题。我认为可以解决序列化问题,但事实并非如此。那么,如何使用? 我假设变量和是不可序列化的,但是我如何正确地序列化它们,以便代码能够在集群上工作,而不仅仅是在本地工作呢? 上面显示的代码抛出错误:

  • 考虑: 如果我们序列化Foo(),输出是: 我想要: 最干净的方法是什么?

  • 我知道我可以创建一个单独的Report类,然后使用@JSONProperty将其嵌入到ReportResponse中。有没有一种方法可以避免这种情况,并用一个注释标记ReportResponse类,将它映射到JSON中的“Report”元素?

  • 问题内容: 在我们的应用程序中,我们使用Spark sql获取字段值作为列。我正在尝试弄清楚如何将列值放入嵌套的json对象并推送到Elasticsearch。还有一种方法可以参数化值以传递给正则表达式? 我们目前正在使用Spark Java API。 实际输出: 我们需要在节点“ txn_summary”下的上述列,例如以下json: 预期产量: 问题答案: 将所有列添加到顶层结构应提供预期的输

  • 我正在创建一个具有嵌套列表的API。Jackson似乎是一个创建对象的好工具,但我不太清楚如何嵌套列表,我想知道这是否可能。 我的对象看起来像这样。 我希望有一种方法可以将其映射到json,看起来像: 我们希望能够做到这一点,以便我们可以将属性添加到列表中。