当前位置: 首页 > 知识库问答 >
问题:

Spark无法使用筛选器中的UDF序列化任务

赖明煦
2023-03-14

null

代码段:

val fun = (df: DataFrame) => {

format.setLenient(false)
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count);

/** Utility to persist all of the bad records   **/

val hiveContext = new HiveContext(sc)
import hiveContext.implicits._

//Writing all Bad records
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName)))

var recordLists = new ListBuffer[List[(String, String, String)]]()
writeToHiveDf.rdd.collect().foreach {
  row =>
    val item = row.mkString("-")
    val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item))
      .map { case List(a, b, c) => (a, b, c) }
    recordLists += recordList
}
val listRDD = sc.parallelize(recordLists.flatten)
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records")



DateFormatConstraintResult(
  this,
  data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
  status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)

}

共有1个答案

史弘致
2023-03-14
 object checkConstraint extends Serializable{
  def checkDateFormat(format: SimpleDateFormat,df: DataFrame): DataFrame = {
    format.setLenient(false)
    val checkDateFormat = (column: String) => Try(format.parse(column)).isFailure
    val cannotBeDate = udf((column: String) => column != null && checkDateFormat(column))
    df.filter(cannotBeDate(new Column(columnName)))
  }
}


val writeToHiveDf = checkConstraint.checkDateFormat(format,df)

因此,所有的计算都打包在一个singleton对象中,该对象返回所需的数据

 类似资料:
  • 我有一个这样的转变: Pageview是一种:Pageview。JAVA 我在Spark上注册了这样的课程: 异常线程"main"org.apache.spark.SparkExctive:任务不能在org.apache.spark.util.ClosureCleaner$. ensureSerializable(ClosureCleaner.scala:166)在org.apache.spark

  • 在我的程序中,我有一个返回一些RDD的方法,我们称它为,它接受一个不可序列化的参数,并让RDD的类型为(我真正的RDD是元组类型,但只包含基元类型)。 当我尝试这样的事情时: 我得到的。 当我用替换(即某个常数)时,它会运行。 从序列化跟踪中,它试图序列化,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。 当我试图直接收集的输出时,即 我也没有问题。 该方法使用获取(本地)值序列

  • 问题内容: 我正在尝试CSS过滤器,但在我的Firefox(15.0)浏览器中不起作用。 HTML: CSS: 问题答案: GrayScale具有使用-moz-filter在Firefox中运行的限制。 要使其正常工作,请使用以下代码段:

  • 这个示例直接取自Spark示例代码,所以我不太清楚到底发生了什么。 我在localhost上运行的Spark独立集群上运行这个。 工人始终失败: 我运行的是Java 11,使用的是Spark 3.0.1。 我确实发现了一个非常相似的问题,看起来它就是答案:java。lang.ClassCastException在远程服务器上的spark作业中使用lambda表达式 然而,在确保将TestSpark

  • 我有一个包含一些字符串值的列表。我想与另一个字符串进行比较来迭代列表。只有当另一个字符串与列表中的任何元素都不匹配时,那么我才应该进入循环。我试过下面这样的东西,但没有效果。在Java8中有没有其他替代方法可以实现同样的功能? 注意:在循环中,我将向同一列表中添加更多的元素。因此,为了避免,我使用if条件进行验证。