当前位置: 首页 > 知识库问答 >
问题:

无法直接从Spark RDD写入/保存数据以Ignite

柯鸿云
2023-03-14

我尝试使用jdbc编写dataframe来ignite,

Spark版本为:2.1

点燃版本:2.3

JDK:1.8

Scala:2.11.8

这是我的代码段:

def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {

  val conn = DataSource.conn
  var psmt:PreparedStatement = null

  try {
    OperationIgniteUtil.deleteIgniteData(conn,targetTable)

    hiveDF.foreachPartition({
      partitionOfRecords => {
        partitionOfRecords.foreach(
          row => for ( i <- 0 until row.length ) {
            psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
            psmt.setObject(i+1, row.get(i))
            psmt.execute()
          }
        )
      }
    })

  }catch {
    case e: Exception =>  e.printStackTrace()
  } finally {
    conn.close
  }
}

然后运行spark,它会显示错误消息

异常:任务不可序列化在org.apache.spark.util.closurecleaner$.ensureclealizable(closurecleaner.scala:298)在org.apache.spark.util.closurecleaner$.org$apache.spark.util.closurecleaner$$clean(closurecleaner.scala:288)在org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:108)在org.apache.spark.sparkcontext.clean(t org.apache.spark.sql.dataset$$anonfun$foreachpartition$1.apply(dataset.scala:2305)在org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57)在org.apache.spark.sql.dataset.withnewexecutionid(dataset.scala:2765)在org.apache.spark.sql.dataset.foreachpartition(dataset.scala:2304)在(sparksubmit.scala:187)在org.apache.spark.deploy.sparksubmit.scala:212)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala:126)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)上由于:java.io.NotSerializable异常:org.apache.ignite.internal.jdbc2.jdbcConnection序列化堆栈:-对象不可序列化(类Scala:100)在org.apache.spark.util.closurecleaner$.ensurecleanalizable(closurecleaner.scala:295)...27

有人知道我能修好它吗?谢谢!

共有2个答案

茅慈
2023-03-14

您必须扩展可序列化接口。

object Test extends Serializable { 
  def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
   ???
  }
}

我希望它能解决你的问题。

单展
2023-03-14

这里的问题是您无法序列化连接以点燃datasource.conn。您为foreachpartition提供的闭包包含连接作为其作用域的一部分,这就是Spark不能序列化它的原因。

幸运的是,Ignite提供了RDD的自定义实现,允许您将值保存到RDD中。您需要首先创建IgniteContext,然后检索Ignite的共享RDD,该共享RDD提供对Ignite的分布式访问,以保存RDD的:

val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
...

// Retrieve Ignite's shared RDD
val igniteRdd = igniteContext.fromCache("partitioned")
igniteRDD.saveValues(hiveDF.toRDD)

更多信息可以从Apache Ignite文档中获得。

 类似资料:
  • 07-24 12:36:23.742: W/System.err(10386):java.io.IO异常:拒绝许可07-24 12:36:23.750: W/System.err(10386): atjava.io.File.createNewFileImpl(本地方法)07-24 12:36:23.750: W/System.err(10386): atjava.io.File.createNe

  • SqliteOpenHelper只是一个工具,是SQL世界和OOP之间的一个通道。我们要新建几个类来请求已经保存在数据库中的数据,和保存新的数据。被定义的类会使用ForecastDbHelper和DataMapper来转换数据库中的数据到domain models。我仍旧使用默认值的方式来实现简单的依赖注入: class ForecastDb( val forecastDbHelper:

  • 我正在尝试用Firebase制作一个简单的Android应用程序。 目前,我只是试图从Firebase实时保存用户数据库。但由于某种原因它不起作用。另一方面,身份验证工作完美。 数据库本身没有显示任何内容。它总是显示“null” 我一直在关注这个YouTube教程,并一直在做一些类似的事情。 当我尝试创建一个用户并将其发送到数据库时,运行的程序没有给出任何答案。我有一个进度条,一旦setValue

  • 这是使用电子邮件密码注册帐户的所有代码,保存验证电子邮件,将用户数据保存到Firestore数据库中。只有Firestore数据库无法运行。 else { Toast.makeText(register.this, “Error ! ” task.getException().getMessage(), Toast.LENGTH_SHORT).show();progressBar.setVisib