当前位置: 首页 > 知识库问答 >
问题:

用Spark验证CSV文件列

路裕
2023-03-14

我试图在Spark中读取一个CSV文件(应该有一个头),并将数据加载到一个现有的表中(带有预定义的列和数据类型)。csv文件可能非常大,所以如果csv的列标题不是“有效”的,我可以避免这样做,那就太好了。

当我当前读取文件时,我指定StructType作为模式,但这并不能验证标头是否包含正确顺序的正确列。这是我到目前为止所做的(我正在另一个地方构建“schema”结构类型):

sqlContext
  .read()
  .format("csv")
  .schema(schema)
  .load("pathToFile");

如果我添加<代码>。option("header "," true)" line它将跳过csv文件的第一行,并使用我在StructType的< code>add方法中传递的名称。(例如,如果我用“id”和“name”构建StructType,并且csv中的第一行是“idzzz,name”,那么得到的dataframe将有“id”和“name”列。我希望能够验证csv标题与我计划加载csv的表具有相同的列名。

我尝试用< code >读取文件。head(),并对第一行进行一些检查,但这样会下载整个文件。

任何建议都非常欢迎。

共有1个答案

漆雕原
2023-03-14

据我所知,您希望验证您读取的CSV的模式。schema选项的问题在于,它的目标是告诉spark它是数据的模式,而不是检查它是否是。

但是,有一个选项可以在读取CSV时推断所述架构,并且在您的情况下可能非常有用(推断Schema)。然后,您可以将该架构与您期望的等式架构进行比较,或者执行我将介绍的小解决方法,使其更加宽松。

让我们看看它是如何工作的以下文件:

a,b
1,abcd
2,efgh

然后,让我们读数据。我使用了scala REPL,但您应该能够非常轻松地将所有这些转换为Java。

val df = spark.read
    .option("header", true) // reading the header
    .option("inferSchema", true) // infering the sschema
    .csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
                              StructField("b", StringType)))

// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true

更进一步

这是一个完美的世界。事实上,如果您的模式包含例如不可为空的列或其他小差异,这种基于对象严格相等的解决方案将不起作用。

val schema2 = StructType(Array(StructField("a", IntegerType, false),
                               StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are  nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false

在这种情况下,您可能需要实现适合您的模式比较方法:

def equalSchemas(s1 : StructType, s2 : StructType) = {
  s1.indices
    .map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
              s1(i).dataType.equals(s2(i).dataType))
    .reduce(_ && _)
}
equalSchemas(schema2, df.schema)
// res23: Boolean = true

我正在检查列的名称和类型是否匹配,以及顺序是否相同。您可能需要根据需要实现不同的逻辑。

 类似资料:
  • 我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据。以下是我正在做的: 我希望这个调用会给我一个文件前两列的列表,但我遇到了以下错误: 索引器中第1行的文件“”:列表索引超出范围 虽然我的CSV文件不止一列。

  • 我正在尝试使用新的spark 2.1 CSV选项将数据帧保存到CSV中 一切都很好,我不介意使用part-000XX前缀,但现在似乎添加了一些UUID作为后缀 任何人都知道我如何删除此文件ext并只保留part-000XX Convention 谢啦

  • 我想在Zeppelin中阅读csv文件,并想使用Databricks的spark-csv包:https://github.com/databricks/spark-csv 提前感谢!

  • 我对Spark和Scala是新手。我们将广告事件日志文件格式化为CSV,然后使用PKZIP进行压缩。我已经看到了许多关于如何使用Java解压缩压缩文件的示例,但是如何使用Scala for Spark来实现这一点呢?我们最终希望从每个传入文件中获取、提取并加载数据到Hbase目标表中。也许这可以用HadooprDD来完成吗?在这之后,我们将引入Spark streaming来监视这些文件。

  • 问题内容: 我正在研究可分析android应用程序的Java SE应用程序。 是否有任何可用于验证AndroidManifest.xml的架构/ DTD?如果找到名称空间定义,例如: 但是schemas.android.com不能解析为实际主机,因此那里没有架构文件:-( 注:我 不 希望做验证在Android作为一个平台。 问题答案: Android XML没有DTD /方案 ,您必须依靠其他可