这个问题与这个主题有关: Spark 2.2 Scala 数据帧从字符串数组中选择,捕获错误
我需要区分缺少列的记录(这在我的用例中不是错误)和具有不适用于列类型的垃圾值的记录。
在执行selectExpr之后,这两种情况在结果数据帧中都显示为null。我正在寻找一种快速的方法,将缺少列的记录包含在好的结果中,同时将具有垃圾值的记录放入坏桶中。不好的可能包括像一个值为空字符串的int字段,或者“abc”。
例如,假设我有这样的DataFrame:Col A-string、Col B-int、Col C-string,
A B C
"x", "", "" - Error, bad value for B
"", null,"" - Good, missing value for B
"", "a", "" - Bad, bad value for B
"x", "1", "x" - Good, normal case
-----编辑-----
显示创建数据帧的代码。数据以json的形式出现,所有字段都用引号引起来,因此它最初认为所有内容都是字符串。我需要键入几个字段以 int、布尔值等。有关完整详细信息,请参阅顶部的链接。
val cols = dfLower.columns
val typedCols = cols.map( c => getTypeStmtForCol(c, qradarType) )
val result = dfLower.selectExpr(typedCols: _*)
// This puts both records with missing columns and bad values in bad.
// Need way to distinguish between those 2 cases.
val bad = dfLower.where(typedCols.map(expr(_).isNull).reduce(_ || _))
val good = result.na.drop()
-编辑2 -
我想我可能有主意了。如果我可以计算前后每条记录中的空值数量,那么只有那些在select之后有更多空值的记录才会出错。不确定如何实现...
有点快速和肮脏,但是创建一个udf来测试您的条件并根据条件的结果返回一个状态。
def checkIntData=udf((columnData: String) => {
var status = "GOOD"
try{
columnData.toInt
} catch {
case ex: Exception => {
if(columnData == null) {
// Do nothing. This is fine
} else if(columnData.length == 0) {
status = "ERROR"
} else {
status = "BAD"
}
}
}
status
})
val seqData = Seq(("x","","","0"),("",null,"","3"),("","a","","z"),("x","1","x",""))
val df = seqData.toDF("col1","col2","col3","col4")
val colsToCheck = df.select("col2","col4").columns
var newdf = df
// Iterate over the columns you want to check inside the dataframe. Each checked column will add a new status column to newdf
colsToCheck.map{column =>
newdf = newdf.withColumn(column+"Status", checkIntData(newdf(column)))
}
newdf.show()
这将返回以下内容:
+----+----+----+----+----------+----------+
|col1|col2|col3|col4|col2Status|col4Status|
+----+----+----+----+----------+----------+
| x| | | 0| ERROR| GOOD|
| |null| | 3| GOOD| GOOD|
| | a| | | BAD| ERROR|
| x| 1| x| z| GOOD| BAD|
+----+----+----+----+----------+----------+
然后,您可以通过根据状态列进行过滤来创建错误存储桶。
第1到3列来自您的示例。我添加了第4列来展示如何将其应用于多个列,而不必编写100次. with Colpillar()
。我通过创建一个列数组colsToCheck
,然后迭代以将udf应用于所有选定的列来实现这一点。
注意!因为我这样做可能会被吼,所以我想让你知道,使用try/catch作为流控制被认为是一种反模式(也就是糟糕的编程)。阅读更多内容以找出原因。
[新加入Spark]语言-Scala 根据文档,RangePartitioner对元素进行排序并将其划分为块,然后将块分发到不同的机器。下面的例子说明了它是如何工作的。 假设我们有一个数据框,有两列,一列(比如“a”)的连续值从1到1000。还有另一个数据帧具有相同的模式,但对应的列只有4个值30、250、500、900。(可以是任意值,从1到1000中随机选择) 如果我使用RangePartit
我读到过,太多的小分区会因为开销而损害性能,例如,向执行器发送大量任务。 使用最大的分区的缺点是什么?例如,为什么我会看到100s的MB范围内的建议? 如果丢失了一个分区,则需要进行大量的重新计算。对于许多较小的分区,您可能会更经常地丢失分区,但在运行时中的差异会更小。 如果在大分区上执行的少数任务中有一个任务的计算时间比其他任务长,这将使其他核心未被利用,但使用较小的分区,可以更好地在集群中分配
并将其应用于数据表的一列--这是我希望这样做的: 我还没有找到任何简单的方法,正在努力找出如何做到这一点。一定有一个更简单的方法,比将数据rame转换为和RDD,然后从RDD中选择行来获得正确的字段,并将函数映射到所有的值,是吗?创建一个SQL表,然后用一个sparkSQL UDF来完成这个任务,这更简洁吗?
我想过滤掉具有“c2”列前3个字符的记录,无论是“MSL”还是“HCP”。 所以输出应该如下所示。 有谁能帮忙吗? 我知道df。过滤器($c2.rlike(“MSL”))--用于选择记录,但如何排除记录? 版本:Spark 1.6.2 Scala:2.10
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的