当前位置: 首页 > 知识库问答 >
问题:

运行时计算的函数不能很好地处理Spark数据集/rdd

和光启
2023-03-14

我有一个火花应用程序。我的用例是允许用户定义一个类似< code>Record =的任意函数

以下是代码:


    //Sample rows with Id, Name, DOB and address
    val row1 = "19283,Alan,1989-01-20,445 Mount Eden Road Mount Eden Auckland"
    val row2 = "15689,Ben,1989-01-20,445 Mount Eden Road Mount Eden Auckland"

    val record1 = new Record(
      new RecordMetadata(),
      row1,
      true
    )
    val record2 = new Record(
      new RecordMetadata(),
      row2,
      true
    )

    val inputRecsList = List(record1, record2)
    val inputRecs = spark.sparkContext.parallelize(inputRecsList)

    val rule = ScalaExpression(
      //Sample rule. A lambda (Record => Record)
      """
        | import model.Record
        | { record: Record => record }
      """.stripMargin

    val outputRecs = inputRecs.map(rule.transformation)

以下是“Record”、“RecordMetadata”和“ScalaExpression”类的定义:

case class Record(
                   val metadata: RecordMetadata,
                   val row: String,
                   val isValidRecord: Boolean = true
                 ) extends Serializable

案例类 RecordMetadata() 扩展 可序列化

case class ScalaExpression(function: Function1[Record, Record]) extends Rule {

  def transformation = function
}

object ScalaExpression{

  /**
    * @param Scala expression as a string
    * @return Evaluated result of type Function (Record => Record)
    */
  def apply(string: String) = {
    val toolbox = currentMirror.mkToolBox()
    val tree = toolbox.parse(string)
    val fn = toolbox.eval(tree).asInstanceOf[(Record => Record)] //Or Function1(Record, Record)
    new ScalaExpression(fn)
  }
}

上面的代码引发了一个神秘的异常:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

但是,如果规则直接在代码中定义,则代码运行良好:val rule = ScalaExpression( {record: Record =

如果将地图(带有运行时评估规则)应用于列表,而不是RDD/数据集,代码也能很好地工作。

为了让它工作,我被困了一段时间。任何帮助都将不胜感激。

编辑:标记为此问题的“可能重复”正在解决一个完全不同的问题。我的用例尝试在运行时从用户那里获取规则(将一条记录转换为另一条记录的有效 scala 语句),并且在尝试将规则应用于数据集的每条记录时会导致序列化问题。

此致敬意。


共有1个答案

谷梁嘉悦
2023-03-14

Spark JIRA有一个开放的问题来解决这个问题-Spark-20525这个问题的原因是因为在加载Spark UDF时,Spark类加载器不匹配。

解决方法是在解释器之后加载spark会话。请查找示例代码。你也可以参考我的github,例如SparkCustomTransformations

trait CustomTransformations extends Serializable {
  def execute(spark: SparkSession, df: DataFrame, udfFunctions: AnyRef*): DataFrame
}

// IMPORTANT spark session should be lazy evaluated
lazy val spark = getSparkSession

def getInterpretor: scala.tools.nsc.interpreter.IMain = {

  import scala.tools.nsc.GenericRunnerSettings
  import scala.tools.nsc.interpreter.IMain

  val cl = ClassLoader.getSystemClassLoader
  val conf = new SparkConf()
  val settings = new GenericRunnerSettings(println _)
  settings.usejavacp.value = true

  val intp = new scala.tools.nsc.interpreter.IMain(settings, new java.io.PrintWriter(System.out))
  intp.setContextClassLoader
  intp.initializeSynchronous

  intp
}

val intp = getInterpretor

val udf_str =
  """
    (str:String)=>{
      str.toLowerCase
    }
    """
val customTransStr =
  """
    |import org.apache.spark.SparkConf
    |import org.apache.spark.sql.{DataFrame, SparkSession}
    |import org.apache.spark.sql.functions._
    |
    |new CustomTransformations {
    |    override def execute(spark: SparkSession, df: DataFrame, func: AnyRef*): DataFrame = {
    |
    |      //reading your UDF
    |      val str_lower_udf = spark.udf.register("str_lower", func(0).asInstanceOf[Function1[String,String]])
    |
    |      df.createOrReplaceTempView("df")
    |      val df_with_UDF_cols = spark.sql("select a.*, str_lower(a.fakeEventTag) as customUDFCol1 from df a").withColumn("customUDFCol2", str_lower_udf(col("fakeEventTag")))
    |
    |      df_with_UDF_cols.show()
    |      df_with_UDF_cols
    |    }
    |}
  """.stripMargin

intp.interpret(udf_str)
var udf_obj = intp.eval(udf_str)

val eval = new com.twitter.util.Eval
val customTransform: CustomTransformations = eval[CustomTransformations](customTransStr)


val sampleSparkDF = getSampleSparkDF
val outputDF = customTransform.execute(spark, sampleSparkDF, udf_obj)

outputDF.printSchema()
outputDF.show()
 类似资料:
  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 我有一个时间窗口,我尝试确定我是否在一段时间内获得一个新的密钥。我正在通过kafka推送数据,当我调试它时,我看到数据到达方法,但它没有到达方法,并且没有被收集器收集。我正在使用来分配水印: 如有任何协助,我将不胜感激

  • 好吧,我对使用Scala/Spark还比较陌生,我想知道是否有一种设计模式可以在流媒体应用程序中使用大量数据帧(几个100k)? 在我的示例中,我有一个SparkStreaming应用程序,其消息负载类似于: 因此,当用户id为123的消息传入时,我需要使用特定于相关用户的SparkSQL拉入一些外部数据,并将其本地缓存,然后执行一些额外的计算,然后将新数据持久保存到数据库中。然后对流外传入的每条

  • 有时候,能够知道一个计算执行消耗的时间是非常有意义的,尤其是在对比和基准测试中。最简单的一个办法就是在计算开始之前设置一个起始时候,再由计算结束时的结束时间,最后取出它们的差值,就是这个计算所消耗的时间。想要实现这样的做法,可以使用 time 包中的 Now() 和 Sub 函数: start := time.Now() longCalculation() end := time.Now() de

  • Q2-缓存如何帮助这里获得更好的性能,可以使用什么缓存策略?(仅限Mem、Mem和磁盘等)

  • 我目前正在一个小型集群(3个节点,32个CPU和128 GB Ram)上使用线性回归(Spark ML)中的基准测试来评估Spark 2.1.0。我只测量了参数计算的时间(不包括启动、数据加载、……)并识别了以下行为。对于小的数据集,0.1MIO-3MIO数据点,测量的时间并没有真正增加,停留在大约40秒。只有对于较大的数据集,如300个Mio数据点,处理时间才会达到200秒。因此,集群似乎根本无