当前位置: 首页 > 知识库问答 >
问题:

Spark 2数据集空值异常

白嘉石
2023-03-14

在spark数据集中获取此空错误。滤器

输入CSV:

name,age,stat
abc,22,m
xyz,,s

工作代码:

case class Person(name: String, age: Long, stat: String)

val peopleDS = spark.read.option("inferSchema","true")
  .option("header", "true").option("delimiter", ",")
  .csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()

失败代码(添加以下行返回错误):

val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()

返回空错误

java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

共有1个答案

丁雅逸
2023-03-14

你得到的例外应该解释一切,但让我们一步一步来:

>

  • 当使用csv数据源加载数据时,所有字段都标记为nullable

    val path: String = ???
    
    val peopleDF = spark.read
      .option("inferSchema","true")
      .option("header", "true")
      .option("delimiter", ",")
      .csv(path)
    
    peopleDF.printSchema
    
    root
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- stat: string (nullable = true)
    

    缺失的字段表示为SQLNULL

    peopleDF.where($"age".isNull).show
    
    +----+----+----+
    |name| age|stat|
    +----+----+----+
    | xyz|null|   s|
    +----+----+----+
    

    接下来,将Dataset[Row]转换为Dataset[Person],其中使用Longage字段进行编码。因为输入模式是nullable,所以输出模式仍然是nullable,尽管:

    val peopleDS = peopleDF.as[Person]
    
    peopleDS.printSchema
    
    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
     |-- stat: string (nullable = true)
    

    请注意,它作为[T]完全不会影响模式。

    当您使用SQL(在注册表上)或DataFrameAPI查询Dataset时,Spark不会反序列化对象。由于模式仍然是可为空的,我们可以执行:

    peopleDS.where($"age" > 30).show
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    没有任何问题。这只是一个简单的SQL逻辑,NULL是一个有效值。

    当我们使用静态类型的DatasetAPI时:

    peopleDS.filter(_.age > 30)
    

    Spark必须反序列化对象。因为Long不能是null(SQLnull),所以它会失败,出现异常情况。

    如果不是因为这个,你会得到NPE。

    数据的正确静态类型表示应使用可选类型:

    case class Person(name: String, age: Option[Long], stat: String)
    

    具有调整过的过滤功能:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    如果愿意,可以使用模式匹配:

    peopleDS.filter {
      case Some(age) => age > 30
      case _         => false     // or case None => false
    }
    

    请注意,您不必(但还是建议)为namestat使用可选类型。因为ScalaString只是一个JavaString,所以它可以是null。当然,如果使用这种方法,则必须明确检查访问的值是否为null

    相关火花2.0数据集vs数据帧

  •  类似资料:
    • 此函数转换Hbase格式的数据 这是我在第125行得到的错误:hbaseputs.saveasnewapiHadoopDataSet(job.getconfiguration)

    • 我们正试图用普罗米修斯来获取火花指标。我们使用jmx导出器jmx_prometheus_javaagent-0.12.0.jar。 ./spark-submit--监督--部署模式集群--conf'spark.driver.extrajavaoptions=-javaagent:jars/jmx_prometheus_javaagent-0.12.0.jar=8060:/conf/spark.ym

    • 在我目前正在开发的Firebase Android应用程序中,我想提供一个导出特性。这个特性应该允许用户导出一组存储在Firebase中的数据。 我的计划是将所有需要的数据收集到一个中间对象(datastructure)中,该对象可以(重新)用于多种导出类型。 谁有应对这一问题的最佳做法?

    • 我正在尝试使用JDBC处理Presto上的查询,并将结果集传递回Spark,以便在其上创建临时表。我的结果集在列表中 我从kafka producer获得了json Msg形式的查询。因此,我们在spark中创建了kafka consumer,以获取信息并进行进一步处理。 以下是我的主要功能: 以下是将结果集返回给主函数的process_query方法: 但我仍然得到了这个错误输出 请帮帮忙

    • 使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。

    • 我试图在数据帧中找到空值。虽然我回顾了Stackoverflow的以下文章,其中描述了确定空值的过程,但我很难对我的数据集执行相同的操作。 如何计算熊猫数据帧中列中的楠值 工作代码: 我做错了什么?