当前位置: 首页 > 知识库问答 >
问题:

RDD映射中的Spark Scala序列化错误

乐寒
2023-03-14

我有一个RDD格式为RDD[((Long,Long),(Long,Long))],我需要隐藏或转换为RDD[(Long,Long),(Long,Long,Long,Long))],其中第二个RDD元组基于第一个RDD的函数。

我正在尝试实现这个基于地图的功能,但是,我认为在这里做了一些错误的事情。请帮我解决这个问题。

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

class ListItemCorrelation(sc: SparkContext) extends Serializable {

  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
    if (dirX.equals(1)) {
      if (dirY.equals(1)) {
        return (1, 0, 0, 0)
      } else {
        return (0, 1, 0, 0)
      }
    } else {
      if (dirY.equals(1)) {
        return (0, 0, 1, 0)
      } else {
        return (0, 0, 0, 1)
      }
    }
  }

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = {
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map { t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    }
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  }

}
object ListItemCorrelation extends Serializable {
  def main(args: Array[String]) {
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  }
}

共有1个答案

茹正初
2023-03-14

up_down方法放在伴生对象上。当在RDD闭包中访问任何类变量时,类(以及其中的所有内容,如SparkContext)都会被序列化。方法参数在这里算作类变量。使用静态对象可以解决以下问题:

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

object ListItemCorrelation {
  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
    if (dirX.equals(1)) {
      if (dirY.equals(1)) {
        return (1, 0, 0, 0)
      } else {
        return (0, 1, 0, 0)
      }
    } else {
      if (dirY.equals(1)) {
        return (0, 0, 1, 0)
      } else {
        return (0, 0, 0, 1)
      }
    }
  }
}


class ListItemCorrelation(sc: SparkContext) extends Serializable {

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = {
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map { t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    }
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), ListItemCorrelation.up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  }

}
object ListItemCorrelation extends Serializable {
  def main(args: Array[String]) {
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  }
}
 类似资料:
  • 可以序列化/反序列化< code >映射吗 在这种特殊情况下,我知道总是,和 - 第三方类(我有序列化器和反序列化器),其他值是盒装原语。 有可能和杰克逊做这样的事吗?使用MapSerializer/MapDeserializer可以做到这一点吗?(我找不到任何例子)

  • 我对Jackson有一个错误的理解,就是将json文件反序列化为poco。这是我的代码: 我的POCO命名为AnimalBean: }还有我的JSON文件: } 当我执行我的代码时,我有以下错误:未识别的字段“动物园”(类动画豆),未标记为可忽略的。我知道问题是我的json文件开始不直接由动物,但我不能改变它,因为它不是我的。我已经尝试把对象apper.configure(Deseriazatio

  • 以下程序尝试为每一行(在RDD映射中)调用3个函数: 但有些错误输出: 它可能是由“隐式val格式=DefaultFormats”引起的。但我需要在“映射”之前提取值。

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 如何创建列colMap的ArrayType[StringType]哪个值是数组与元素是字符串匹配的列的名称哪些值为真? 我有这样的输入DataFrame: 我想创建这样的输出DataFrame: 编辑:我发现了这个重复的问题: Spark scala从多列中获取字符串类型的数组 但想知道是否有更好的方法来实现产出?