当前位置: 首页 > 知识库问答 >
问题:

如何在使用schema Spark读取csv时删除格式错误的行?

栾景胜
2023-03-14

当我使用Spark DataSet加载csv文件时。我更喜欢清楚地指定模式。但是我发现有几行不符合我的模式。一列应该是双精度的,但有些行是非数值。是否可以轻松地从DataSet中过滤所有不符合我的模式的行?

val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")

f、 csv:

a
1.0

我更喜欢“a”可以很容易地从我的数据集中过滤出来。谢谢!

共有2个答案

仲高超
2023-03-14

<代码>。选项(“mode”,“DROPMALFORMED”)应该可以完成这项工作。

<代码>模式(默认允许):允许在解析期间处理损坏记录的模式。

>

  • 允许:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的新字段中。当用户设置架构时,它会为额外字段设置null。

    DROPMALFORMED:忽略整个损坏的记录。

    FAILFAST:遇到损坏记录时引发异常。

  • 仉成益
    2023-03-14

    如果您正在读取CSV文件并希望删除与架构不匹配的行。您可以通过将选项mode添加DROPMALFORMED来做到这一点

    输入数据

    a,1.0
    b,2.2
    c,xyz
    d,4.5
    e,asfsdfsdf
    f,3.1
    

    架构

    val schema = StructType(Seq(
      StructField("key", StringType, false),
      StructField("value", DoubleType, false)
    ))
    

    使用模式和选项读取csv文件

      val df = spark.read.schema(schema)
        .option("mode", "DROPMALFORMED")
        .csv("/path to csv file ")
    

    输出:

    +-----+-----+
    |key  |value|
    +-----+-----+
    |hello|1.0  |
    |hi   |2.2  |
    |how  |3.1  |
    |you  |4.5  |
    +-----+-----+
    

    您可以在此处获得有关spark csv的更多详细信息

    希望这有帮助!

     类似资料:
    • 输出为 预期输出

    • 我收到的csv文件格式不正确(无法控制生成此csv的应用程序) CSV的标题和第一行如下所示: 这是我用来读取csv的代码: 这是我收到的输出: 第一个问题是奇怪的字符(可能缺少编码选项?)另外,标题是错误的,不能在该格式上使用DictReader,这对于编辑CSV非常有用。 我可以重写一个新的CSV与标题正确格式化,这不是一个问题,但我不知道如何跳过CSV的前3行!?或者我可以用CSV即将到来的

    • 我在Azure上有一个Databricks5.3集群,它运行Apache Spark 2.4.0和Scala 2.11。 我不是Java/Scala开发人员,也不熟悉Spark/Databricks。我找不到Spark用来解析值的datetime格式化程序。 我尝试的模式:

    • 问题内容: 我正在使用以下代码解压缩并保存CSV文件: 似乎一切正常,除了文件中的第一个字符是意外的事实。谷歌搜索似乎表明这是由于文件中的BOM。 我已经读过,将内容编码为utf-8-sig应该可以解决此问题。但是,添加: 到csv.reader中的f失败并显示: 如何删除BOM表并将其内容保存在正确的utf-8中? 问题答案: 首先,您需要解码文件内容,而不是对其进行编码。 其次,该模块不喜欢P

    • 问题内容: 我试图在每次单击按钮时将对象(类)添加到文件中。然后,我尝试从文件中读取所有对象,并将其加载到。但是我在反序列化函数的第二次迭代中得到了一个。我已经验证了第一次迭代可以正常工作。 我读过许多关于同一问题的其他文章: 一种建议反序列化多个Java对象是将整个数组列表而不是单个对象写入文件。但这似乎很浪费,因为每次单击按钮时我都需要写入文件,并且随着时间的推移将有数百个对象。 另一个建议S

    • Scala\u Spark\u DataFrameReader\u csv的文档表明,Spark可以记录在读取时检测到的格式错误的行。csv文件 -如何记录格式错误的行 -是否可以获取包含格式错误行的val或var? 链接文档中的选项是:maxMalformedLogPerPartition(默认值10):设置Spark将为每个分区记录的最大错误行数。超过此数字的格式错误记录将被忽略