当我使用Spark DataSet加载csv文件时。我更喜欢清楚地指定模式。但是我发现有几行不符合我的模式。一列应该是双精度的,但有些行是非数值。是否可以轻松地从DataSet中过滤所有不符合我的模式的行?
val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")
f、 csv:
a
1.0
我更喜欢“a”可以很容易地从我的数据集中过滤出来。谢谢!
<代码>。选项(“mode”,“DROPMALFORMED”)应该可以完成这项工作。
<代码>模式(默认允许):允许在解析期间处理损坏记录的模式。
>
允许:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的新字段中。当用户设置架构时,它会为额外字段设置null。
DROPMALFORMED
:忽略整个损坏的记录。
FAILFAST:遇到损坏记录时引发异常。
如果您正在读取CSV
文件并希望删除与架构不匹配的行。您可以通过将选项mode
添加为DROPMALFORMED
来做到这一点
输入数据
a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1
架构
val schema = StructType(Seq(
StructField("key", StringType, false),
StructField("value", DoubleType, false)
))
使用模式和选项读取csv文件
val df = spark.read.schema(schema)
.option("mode", "DROPMALFORMED")
.csv("/path to csv file ")
输出:
+-----+-----+
|key |value|
+-----+-----+
|hello|1.0 |
|hi |2.2 |
|how |3.1 |
|you |4.5 |
+-----+-----+
您可以在此处获得有关spark csv的更多详细信息
希望这有帮助!
输出为 预期输出
我收到的csv文件格式不正确(无法控制生成此csv的应用程序) CSV的标题和第一行如下所示: 这是我用来读取csv的代码: 这是我收到的输出: 第一个问题是奇怪的字符(可能缺少编码选项?)另外,标题是错误的,不能在该格式上使用DictReader,这对于编辑CSV非常有用。 我可以重写一个新的CSV与标题正确格式化,这不是一个问题,但我不知道如何跳过CSV的前3行!?或者我可以用CSV即将到来的
我在Azure上有一个Databricks5.3集群,它运行Apache Spark 2.4.0和Scala 2.11。 我不是Java/Scala开发人员,也不熟悉Spark/Databricks。我找不到Spark用来解析值的datetime格式化程序。 我尝试的模式:
问题内容: 我正在使用以下代码解压缩并保存CSV文件: 似乎一切正常,除了文件中的第一个字符是意外的事实。谷歌搜索似乎表明这是由于文件中的BOM。 我已经读过,将内容编码为utf-8-sig应该可以解决此问题。但是,添加: 到csv.reader中的f失败并显示: 如何删除BOM表并将其内容保存在正确的utf-8中? 问题答案: 首先,您需要解码文件内容,而不是对其进行编码。 其次,该模块不喜欢P
问题内容: 我试图在每次单击按钮时将对象(类)添加到文件中。然后,我尝试从文件中读取所有对象,并将其加载到。但是我在反序列化函数的第二次迭代中得到了一个。我已经验证了第一次迭代可以正常工作。 我读过许多关于同一问题的其他文章: 一种建议反序列化多个Java对象是将整个数组列表而不是单个对象写入文件。但这似乎很浪费,因为每次单击按钮时我都需要写入文件,并且随着时间的推移将有数百个对象。 另一个建议S
Scala\u Spark\u DataFrameReader\u csv的文档表明,Spark可以记录在读取时检测到的格式错误的行。csv文件 -如何记录格式错误的行 -是否可以获取包含格式错误行的val或var? 链接文档中的选项是:maxMalformedLogPerPartition(默认值10):设置Spark将为每个分区记录的最大错误行数。超过此数字的格式错误记录将被忽略