我正在尝试连接spark streaming应用程序中的DB2数据库和导致“org.apache.spark.sparkException:Task not Serializable”问题的数据库查询执行语句。请指教。下面是我有的示例代码供参考。
dataLines.foreachRDD{rdd=>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
val dataRows=rdd.map(rs => rs.value).map(row =>
row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
, "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
)
val db2Conn = getDB2Connection(spark,db2ConParams)
dataRows.foreach{ case (k,v) =>
val table = v._4
val dbQuery = s"(SELECT * FROM $table ) tblResult"
val df=getTableData(db2Conn,dbQuery)
df.show(2)
}
}
Below is other function code:
private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
spark.read.format("jdbc").options(db2ConParams)
}
private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
db2Con.option("dbtable",tableName).load()
}
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
下面是错误日志:
理想情况下,您应该保持datarows.foreach
中的闭包不包含任何连接对象,因为闭包要序列化到执行程序并在那里运行。这个概念在这个官方链接上有深入的讨论
在您的情况下,行以下是导致问题的关闭:
val df=GetTableData(db2Conn,dbQuery)
除了df.show(2)
之外,您没有提到您正在对表数据执行什么操作。如果行很大,那么您可以讨论更多关于您的用例的内容。也许,你需要考虑一个不同的设计。
无法解决以下由)触发的序列化问题。我认为可以解决序列化问题,但事实并非如此。那么,如何使用? 我假设变量和是不可序列化的,但是我如何正确地序列化它们,以便代码能够在集群上工作,而不仅仅是在本地工作呢? 上面显示的代码抛出错误:
考虑: 如果我们序列化Foo(),输出是: 我想要: 最干净的方法是什么?
问题内容: 在我们的应用程序中,我们使用Spark sql获取字段值作为列。我正在尝试弄清楚如何将列值放入嵌套的json对象并推送到Elasticsearch。还有一种方法可以参数化值以传递给正则表达式? 我们目前正在使用Spark Java API。 实际输出: 我们需要在节点“ txn_summary”下的上述列,例如以下json: 预期产量: 问题答案: 将所有列添加到顶层结构应提供预期的输
我知道我可以创建一个单独的Report类,然后使用@JSONProperty将其嵌入到ReportResponse中。有没有一种方法可以避免这种情况,并用一个注释标记ReportResponse类,将它映射到JSON中的“Report”元素?
我正在创建一个具有嵌套列表的API。Jackson似乎是一个创建对象的好工具,但我不太清楚如何嵌套列表,我想知道这是否可能。 我的对象看起来像这样。 我希望有一种方法可以将其映射到json,看起来像: 我们希望能够做到这一点,以便我们可以将属性添加到列表中。
问题内容: 在我当前的项目中,我在android中使用GSON库,并且遇到了嵌套地图反序列化的问题。这就是初始json的样子 而我的pojo的 和花类 但是当我尝试反序列化此对象时,我可以访问嵌套的哈希图,示例代码为 有什么建议? 问题答案: 这告诉Gson您想反序列化为未知值类型的Map。您可能会想指定类似的东西,但是您无法在Java中进行指定,因此解决方案是使用他们在Gson中称为TypeTo