我尝试使用jdbc编写dataframe来ignite,
Spark版本为:2.1
点燃版本:2.3
JDK:1.8
Scala:2.11.8
这是我的代码段:
def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
val conn = DataSource.conn
var psmt:PreparedStatement = null
try {
OperationIgniteUtil.deleteIgniteData(conn,targetTable)
hiveDF.foreachPartition({
partitionOfRecords => {
partitionOfRecords.foreach(
row => for ( i <- 0 until row.length ) {
psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
psmt.setObject(i+1, row.get(i))
psmt.execute()
}
)
}
})
}catch {
case e: Exception => e.printStackTrace()
} finally {
conn.close
}
}
然后运行spark,它会显示错误消息:
异常:任务不可序列化在org.apache.spark.util.closurecleaner$.ensureclealizable(closurecleaner.scala:298)在org.apache.spark.util.closurecleaner$.org$apache.spark.util.closurecleaner$$clean(closurecleaner.scala:288)在org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:108)在org.apache.spark.sparkcontext.clean(t org.apache.spark.sql.dataset$$anonfun$foreachpartition$1.apply(dataset.scala:2305)在org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57)在org.apache.spark.sql.dataset.withnewexecutionid(dataset.scala:2765)在org.apache.spark.sql.dataset.foreachpartition(dataset.scala:2304)在(sparksubmit.scala:187)在org.apache.spark.deploy.sparksubmit.scala:212)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala:126)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)上由于:java.io.NotSerializable异常:org.apache.ignite.internal.jdbc2.jdbcConnection序列化堆栈:-对象不可序列化(类Scala:100)在org.apache.spark.util.closurecleaner$.ensurecleanalizable(closurecleaner.scala:295)...27
有人知道我能修好它吗?谢谢!
您必须扩展可序列化接口。
object Test extends Serializable {
def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
???
}
}
我希望它能解决你的问题。
这里的问题是您无法序列化连接以点燃datasource.conn
。您为foreachpartition
提供的闭包包含连接作为其作用域的一部分,这就是Spark不能序列化它的原因。
幸运的是,Ignite提供了RDD的自定义实现,允许您将值保存到RDD中。您需要首先创建IgniteContext
,然后检索Ignite的共享RDD,该共享RDD提供对Ignite的分布式访问,以保存RDD的行
:
val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
...
// Retrieve Ignite's shared RDD
val igniteRdd = igniteContext.fromCache("partitioned")
igniteRDD.saveValues(hiveDF.toRDD)
更多信息可以从Apache Ignite文档中获得。
刀 服务
07-24 12:36:23.742: W/System.err(10386):java.io.IO异常:拒绝许可07-24 12:36:23.750: W/System.err(10386): atjava.io.File.createNewFileImpl(本地方法)07-24 12:36:23.750: W/System.err(10386): atjava.io.File.createNe
SqliteOpenHelper只是一个工具,是SQL世界和OOP之间的一个通道。我们要新建几个类来请求已经保存在数据库中的数据,和保存新的数据。被定义的类会使用ForecastDbHelper和DataMapper来转换数据库中的数据到domain models。我仍旧使用默认值的方式来实现简单的依赖注入: class ForecastDb( val forecastDbHelper:
我正在尝试用Firebase制作一个简单的Android应用程序。 目前,我只是试图从Firebase实时保存用户数据库。但由于某种原因它不起作用。另一方面,身份验证工作完美。 数据库本身没有显示任何内容。它总是显示“null” 我一直在关注这个YouTube教程,并一直在做一些类似的事情。 当我尝试创建一个用户并将其发送到数据库时,运行的程序没有给出任何答案。我有一个进度条,一旦setValue
这是使用电子邮件密码注册帐户的所有代码,保存验证电子邮件,将用户数据保存到Firestore数据库中。只有Firestore数据库无法运行。 else { Toast.makeText(register.this, “Error ! ” task.getException().getMessage(), Toast.LENGTH_SHORT).show();progressBar.setVisib
提前谢谢!!