我正在做一个简单的ETL项目,它读取CSV文件,对每一列进行一些修改,然后将结果作为JSON写出。我希望读取我的结果的下游流程确信我的输出符合约定的模式,但我的问题是,即使我为所有字段定义了nullable=false的输入模式,空值也可能偷偷进入并破坏我的输出文件,而且似乎没有(performant)方法可以让Spark为我的输入字段强制执行‘not null’。
这似乎是一个功能,如下所述,在火花,权威指南:
当您定义一个架构时,所有列都声明为没有空值,Spark不会强制执行该架构,并且会很乐意让空值进入该列。可为空的信号只是为了帮助 Spark SQL 优化以处理该列。如果列中的空值不应具有空值,则可能会得到不正确的结果或看到难以调试的奇怪异常。
我编写了一个小的检查实用程序来检查数据帧的每一行,如果在任何列(在任何嵌套级别,在map、struct或array之类的字段或子字段的情况下)中检测到空值,就会引发错误。)
我特别想知道:我是不是用这个检查工具重新插入了车轮?是否有任何现有的库或Spark技术可以为我做到这一点(最好是以比我实现的更好的方式)?
检查实用程序和我的管道的简化版本出现在下面。如上所述,对检查实用程序的调用被注释掉。如果您在未启用检查实用程序的情况下运行,您将在 /tmp/output.csv.中看到此结果
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
标题后面的第二行应该是一个数字,但它是一个空字符串(我想这就是火花写出空值的方式。)对于读取我的ETL作业输出的下游组件来说,这个输出会有问题:这些组件只想要整数。
现在,我可以通过取消注释该行来启用检查
//checkNulls(inDf)
当我这样做时,我收到一个异常,它通知我无效的空值并打印出整个违规行,如下所示:
java.lang.RuntimeException: found null column value in row: [null,4]
Spark/最终指南中给出了一种可能的替代方法
Spark,权威指南提到了这样做的可能性:
<dataframe>.na.drop()
但这将(AFAIK)无声地丢弃坏记录,而不是标记坏记录。然后,我可以在删除之前和之后对输入进行“设置减法”,但这似乎是一个巨大的性能打击,以找出什么是空的,什么不是。乍一看,我更喜欢我的方法……但我仍然想知道是否有更好的方法。下面给出了完整的代码。谢谢
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}
您可以使用内置的 Row 方法 anyNull 来拆分数据帧,并以不同方式处理这两种拆分:
val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)
如果您不打算使用手动空处理过程,使用内置DataFrame.na方法会更简单,因为它已经实现了自动处理空值的所有常用方法(即删除空值或用默认值填充空值)。
我的存储库中的以下代码: 使用REST服务执行这个方法(updateClientInfoById)会给我一个例外:javax。坚持不懈TransactionRequiredException:执行更新/删除查询 我必须添加@Transactional才能让它正常工作。 为什么存储库中的方法在默认情况下不都是事务性的? 提前感谢:)
Vaadin 7.6.2 BeanitemContainer
问题内容: 是否有适用于javascript的良好分析器?我知道firebug对分析代码提供了一些支持。但是我想确定更大范围的统计数据。想象一下,您正在构建大量的javascript代码,并且您想确定代码中实际上是什么瓶颈。首先,我想查看每个javascript函数和执行时间的配置文件统计信息。接下来将包括DOM函数。这与放慢速度的操作(如对渲染树的操作)相结合将是完美的。我认为,如果在我的代码,
我有一个超时执行任务的方法。我使用ExecutorServer.submit()获取一个Future对象,然后调用future.get()并超时。这很好,但是我的问题是处理我的任务可能抛出的检查异常的最好方法。下面的代码工作正常,并且保留了被检查的异常,但是如果方法签名中被检查的异常的列表改变了,它看起来非常笨拙并且容易出错。 关于如何解决这个问题的任何建议?我需要以Java 5为目标,但我也很好
问题内容: 当提交HTML表单而不指定方法时,默认的HTTP方法是什么?GET还是POST? 在HTML标准之间这种行为是否发生过变化? 请尽可能引用W3C标准文档。 问题答案: 是GET。 看一下W3C取代的建议书17.3FORM元素。 摘抄: 好读 HTML表单中的GET和POST方法-有什么区别?
问题内容: 我需要比较两个对象(同一类的实例)中的许多字段,并做一些记录和更新,以防出现差异。元代码可能看起来像这样: 具有所有比较的代码非常简洁,我想以某种方式使其更紧凑。如果我有一个方法可以将setter和getter的调用作为参数并在所有字段中调用,那将是很好的,但是不幸的是,这对于Java是不可能的。 我提出了三个选择,每个选择都有其自身的缺点。 1.使用反射API来找出getter和se