在spark数据集中获取此空错误。滤器
输入CSV:
name,age,stat
abc,22,m
xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
失败代码(添加以下行返回错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
返回空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
你得到的例外应该解释一切,但让我们一步一步来:
>
当使用csv
数据源加载数据时,所有字段都标记为nullable
:
val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
缺失的字段表示为SQLNULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
接下来,将Dataset[Row]
转换为Dataset[Person]
,其中使用Long
对age
字段进行编码。因为输入模式是nullable
,所以输出模式仍然是nullable
,尽管:
val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它作为[T]完全不会影响模式。
当您使用SQL(在注册表上)或DataFrame
API查询Dataset
时,Spark不会反序列化对象。由于模式仍然是可为空的,我们可以执行:
peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题。这只是一个简单的SQL逻辑,NULL
是一个有效值。
当我们使用静态类型的Dataset
API时:
peopleDS.filter(_.age > 30)
Spark必须反序列化对象。因为Long
不能是null
(SQLnull
),所以它会失败,出现异常情况。
如果不是因为这个,你会得到NPE。
数据的正确静态类型表示应使用可选
类型:
case class Person(name: String, age: Option[Long], stat: String)
具有调整过的过滤功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果愿意,可以使用模式匹配:
peopleDS.filter {
case Some(age) => age > 30
case _ => false // or case None => false
}
请注意,您不必(但还是建议)为
name
和stat
使用可选类型。因为ScalaString
只是一个JavaString
,所以它可以是null
。当然,如果使用这种方法,则必须明确检查访问的值是否为null
。
相关火花2.0数据集vs数据帧
此函数转换Hbase格式的数据 这是我在第125行得到的错误:hbaseputs.saveasnewapiHadoopDataSet(job.getconfiguration)
我们正试图用普罗米修斯来获取火花指标。我们使用jmx导出器jmx_prometheus_javaagent-0.12.0.jar。 ./spark-submit--监督--部署模式集群--conf'spark.driver.extrajavaoptions=-javaagent:jars/jmx_prometheus_javaagent-0.12.0.jar=8060:/conf/spark.ym
在我目前正在开发的Firebase Android应用程序中,我想提供一个导出特性。这个特性应该允许用户导出一组存储在Firebase中的数据。 我的计划是将所有需要的数据收集到一个中间对象(datastructure)中,该对象可以(重新)用于多种导出类型。 谁有应对这一问题的最佳做法?
我正在尝试使用JDBC处理Presto上的查询,并将结果集传递回Spark,以便在其上创建临时表。我的结果集在列表中 我从kafka producer获得了json Msg形式的查询。因此,我们在spark中创建了kafka consumer,以获取信息并进行进一步处理。 以下是我的主要功能: 以下是将结果集返回给主函数的process_query方法: 但我仍然得到了这个错误输出 请帮帮忙
使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。
我试图在数据帧中找到空值。虽然我回顾了Stackoverflow的以下文章,其中描述了确定空值的过程,但我很难对我的数据集执行相同的操作。 如何计算熊猫数据帧中列中的楠值 工作代码: 我做错了什么?