当前位置: 首页 > 知识库问答 >
问题:

默认情况下,Spark sql模式中的可空性是建议性的。严格执行它的最佳方法是什么?

雍光远
2023-03-14

我正在做一个简单的ETL项目,它读取CSV文件,对每一列进行一些修改,然后将结果作为JSON写出。我希望读取我的结果的下游流程确信我的输出符合约定的模式,但我的问题是,即使我为所有字段定义了nullable=false的输入模式,空值也可能偷偷进入并破坏我的输出文件,而且似乎没有(performant)方法可以让Spark为我的输入字段强制执行‘not null’。

这似乎是一个功能,如下所述,在火花,权威指南:

当您定义一个架构时,所有列都声明为没有空值,Spark不会强制执行该架构,并且会很乐意让空值进入该列。可为空的信号只是为了帮助 Spark SQL 优化以处理该列。如果列中的空值不应具有空值,则可能会得到不正确的结果或看到难以调试的奇怪异常。

我编写了一个小的检查实用程序来检查数据帧的每一行,如果在任何列(在任何嵌套级别,在map、struct或array之类的字段或子字段的情况下)中检测到空值,就会引发错误。)

我特别想知道:我是不是用这个检查工具重新插入了车轮?是否有任何现有的库或Spark技术可以为我做到这一点(最好是以比我实现的更好的方式)?

检查实用程序和我的管道的简化版本出现在下面。如上所述,对检查实用程序的调用被注释掉。如果您在未启用检查实用程序的情况下运行,您将在 /tmp/output.csv.中看到此结果

cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5

标题后面的第二行应该是一个数字,但它是一个空字符串(我想这就是火花写出空值的方式。)对于读取我的ETL作业输出的下游组件来说,这个输出会有问题:这些组件只想要整数。

现在,我可以通过取消注释该行来启用检查

   //checkNulls(inDf)

当我这样做时,我收到一个异常,它通知我无效的空值并打印出整个违规行,如下所示:

        java.lang.RuntimeException: found null column value in row: [null,4]

Spark/最终指南中给出了一种可能的替代方法

Spark,权威指南提到了这样做的可能性:

<dataframe>.na.drop() 

但这将(AFAIK)无声地丢弃坏记录,而不是标记坏记录。然后,我可以在删除之前和之后对输入进行“设置减法”,但这似乎是一个巨大的性能打击,以找出什么是空的,什么不是。乍一看,我更喜欢我的方法……但我仍然想知道是否有更好的方法。下面给出了完整的代码。谢谢

package org

import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {

  import NullCheckMethods._

  //val input = "2,3\n\"xxx\",4"          // this will be dropped as malformed
  val input = "2,3\n,4"                   // BUT.. this will be let through

  new PrintWriter("/tmp/foo.csv") { write(input); close }

  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[*]")
  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()
  val spark = sparkSession

  val schema = new StructType(
    Array(
      StructField("one", IntegerType, nullable = false),
      StructField("two", IntegerType, nullable = false)
    )
  )

  val inDf: DataFrame =
    spark.
      read.
      option("header", "false").
      option("mode", "dropMalformed").
      schema(schema).
      csv("/tmp/foo.csv")

  //checkNulls(inDf)

  val plusOneDf = inDf.selectExpr("one+1", "two+1")
  plusOneDf.show()

  plusOneDf.
    write.
    option("header", "true").
    csv("/tmp/output.csv")

}

object NullCheckMethods extends Serializable {

  def checkNull(columnValue: Any): Unit = {
    if (columnValue == null)
      throw new RuntimeException("got null")
    columnValue match {
      case item: Seq[_] =>
        item.foreach(checkNull)
      case item: Map[_, _] =>
        item.values.foreach(checkNull)
      case item: Row =>
        item.toSeq.foreach {
          checkNull
        }
      case default =>
        println(
          s"bad object [ $default ] of type: ${default.getClass.getName}")
    }
  }

  def checkNulls(row: Row): Unit = {
    try {
      row.toSeq.foreach {
        checkNull
      }
    } catch {
      case err: Throwable =>
        throw new RuntimeException(
          s"found null column value in row: ${row}")
    }
  }


  def checkNulls(df: DataFrame): Unit = {
    df.foreach { row => checkNulls(row) }
  }
}

共有1个答案

和魁
2023-03-14

您可以使用内置的 Row 方法 anyNull 来拆分数据帧,并以不同方式处理这两种拆分:

val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)

如果您不打算使用手动空处理过程,使用内置DataFrame.na方法会更简单,因为它已经实现了自动处理空值的所有常用方法(即删除空值或用默认值填充空值)。

 类似资料:
  • 我的存储库中的以下代码: 使用REST服务执行这个方法(updateClientInfoById)会给我一个例外:javax。坚持不懈TransactionRequiredException:执行更新/删除查询 我必须添加@Transactional才能让它正常工作。 为什么存储库中的方法在默认情况下不都是事务性的? 提前感谢:)

  • 问题内容: 是否有适用于javascript的良好分析器?我知道firebug对分析代码提供了一些支持。但是我想确定更大范围的统计数据。想象一下,您正在构建大量的javascript代码,并且您想确定代码中实际上是什么瓶颈。首先,我想查看每个javascript函数和执行时间的配置文件统计信息。接下来将包括DOM函数。这与放慢速度的操作(如对渲染树的操作)相结合将是完美的。我认为,如果在我的代码,

  • 我有一个超时执行任务的方法。我使用ExecutorServer.submit()获取一个Future对象,然后调用future.get()并超时。这很好,但是我的问题是处理我的任务可能抛出的检查异常的最好方法。下面的代码工作正常,并且保留了被检查的异常,但是如果方法签名中被检查的异常的列表改变了,它看起来非常笨拙并且容易出错。 关于如何解决这个问题的任何建议?我需要以Java 5为目标,但我也很好

  • 问题内容: 当提交HTML表单而不指定方法时,默认的HTTP方法是什么?GET还是POST? 在HTML标准之间这种行为是否发生过变化? 请尽可能引用W3C标准文档。 问题答案: 是GET。 看一下W3C取代的建议书17.3FORM元素。 摘抄: 好读 HTML表单中的GET和POST方法-有什么区别?

  • 问题内容: 我需要比较两个对象(同一类的实例)中的许多字段,并做一些记录和更新,以防出现差异。元代码可能看起来像这样: 具有所有比较的代码非常简洁,我想以某种方式使其更紧凑。如果我有一个方法可以将setter和getter的调用作为参数并在所有字段中调用,那将是很好的,但是不幸的是,这对于Java是不可能的。 我提出了三个选择,每个选择都有其自身的缺点。 1.使用反射API来找出getter和se