当前位置: 首页 > 知识库问答 >
问题:

Spark SQL-使用架构读取csv

郎吉星
2023-03-14

我在尝试使用Spark简单读取CSV文件时遇到了这个问题。在这样的操作之后,我想确保:

  • 数据类型是正确的(使用提供的模式)
  • 根据提供的架构,标头是正确的

这是我使用的代码,并且有问题:

val schema = Encoders.product[T].schema
val df = spark.read
 .schema(schema)
 .option("header", "true")
 .csv(fileName)

类型为产品类型,即案例类。这是可行的,但它不会检查列名是否正确,因此我可以提供另一个文件,只要数据类型正确,就不会发生错误,而且我不知道用户提供了错误的文件,但由于某种程度上的巧合,正确的数据类型具有正确的顺序。

我尝试使用推断架构的选项,然后在Dataset上使用. as[T]方法,但如果String以外的任何列仅包含null,则Spark将其解释为String列,但在我的架构中它是整数。所以发生了强制转换异常,但列名已检查无误。

总结一下:我找到了可以确保正确数据类型但没有标头的解决方案,以及可以验证标头但数据类型有问题的其他解决方案。有没有办法同时实现这两个目标,即。e.完全验证标头和类型?

我正在使用Spark 2.2.0。

共有1个答案

吴宏扬
2023-03-14

看起来您必须自己读取两次文件头。

看看Spark的代码,如果用户提供他们自己的模式,那么推断出的头将被完全忽略(从未真正读取),因此无法使Spark在这种不一致性上失败。

要自己进行此比较,请执行以下操作:

val schema = Encoders.product[T].schema

// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file 
val fileSchema = spark.read
  .option("header", "true")
  .csv("test.csv").schema

// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
  .option("header", "true")
  .csv("test.csv")

// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
  .zip(schema.fields.map(_.name))
  .filter { case (actual, expected) => actual != expected }

// fail if any inconsistency found:
assert(badColumnNames.isEmpty, 
  s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
 类似资料:
  • 我正在尝试使用SparkSQL将我的数据库导出到我的S3中的镶木地板格式。 我的一张表包含行大小 Spark似乎有一个限制:使用Avro/Parket的Spark作业中的最大行大小。但不确定是否是这样。 有解决方法吗?

  • 我的猜测是我没有以正确的方式声明模式文档,但我不知道错误到底在哪里。我做错了什么?

  • 我正在从Impala迁移到SparkSQL,使用以下代码读取一个表: 我如何调用上面的SparkSQL,这样它就可以返回这样的东西:

  • 当我运行以下命令时: 这些列打印为“_col0”、“_col1”、“_col2”等。而不是它们的真实名称,如“empno”、“name”、“Deptno”。 当我在Hive中“description mytable”时,它会正确打印列名,但当我运行“orcfiledump”时,它也会显示\u col0、\u col1、\u col2。我必须指定“schema on read”或其他什么吗?如果是,

  • 我的spring-security.xml是 忽略XML验证警告org.XML.sax.saxParseException:schema_reference.4:无法读取架构文档“http://www.springframework.org/schema/security/spring-security-3.2.xsd”,因为1)找不到文档;2)无法读取文档;3)文档的根元素不是。 我导入的库是:

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示: