当前位置: 首页 > 知识库问答 >
问题:

如何从Scala Spark DataFrameReader csv记录格式错误的行

姬银龙
2023-03-14

Scala\u Spark\u DataFrameReader\u csv的文档表明,Spark可以记录在读取时检测到的格式错误的行。csv文件
-如何记录格式错误的行
-是否可以获取包含格式错误行的val或var?

链接文档中的选项是:maxMalformedLogPerPartition(默认值10):设置Spark将为每个分区记录的最大错误行数。超过此数字的格式错误记录将被忽略

共有3个答案

傅和璧
2023-03-14

我在这里通过加载csv、从未损坏的记录中创建模式、添加损坏的记录列、使用新模式加载csv,然后查找损坏的记录来扩展clucar的答案。

from pyspark.sql.types import StructField, StringType
from pyspark.sql.functions import col

file_path = "/path/to/file"
mode = "PERMISSIVE"

schema = spark.read.options(mode=mode).csv(file_path).schema
schema = schema.add(StructField("_corrupt_record", StringType(), True))

df = spark.read.options(mode=mode).schema(schema).csv(file_path)
df.cache()
df.count()
df.filter(col("_corrupt_record").isNotNull()).show()
李联
2023-03-14

如果你正在使用火花2.3检查_corrupt_error特殊列...根据几个火花讨论“它应该工作”,所以在读取过滤器后,那些非空的科勒-应该有你的错误...你也可以检查input_file_name()sql func

如果未使用低于2.3版的版本,则应实现自定义读取、记录解决方案,因为根据我的测试,csv数据源不存在\u corrupt\u错误。。。

方永贞
2023-03-14

基于此databricks示例,在读取文件时,需要将“\u corrupt\u record”列显式添加到架构定义中。在pyspark 2.4.4中,类似的内容对我很有用:

from pyspark.sql.types import *

my_schema = StructType([
  StructField("field1", StringType(), True),
  ...
  StructField("_corrupt_record", StringType(), True)
])

my_data = spark.read.format("csv")\
  .option("path", "/path/to/file.csv")\
  .schema(my_schema)
  .load()

my_data.count()  # force reading the csv

corrupt_lines = my_data.filter("_corrupt_record is not NULL")
corrupt_lines.take(5)
 类似资料:
  • 问题内容: 目前,我正在开发一项功能,该功能涉及解析从另一产品收到的XML。我决定对一些实际的客户数据进行一些测试,看起来其他产品正在允许来自用户的输入被认为是无效的。无论如何,我仍然必须尝试找出一种解析它的方法。我们正在使用,但输入出现错误,如下所示。 如你所知,说明中包含似乎是无效标签的内容。现在,此描述标签被称为是叶子标签,并且其中不应包含任何嵌套标签。无论如何,这仍然是一个问题,并且会在

  • 通过@HystrixCommand注释,可以配置一个回退方法,在方法失败的情况下运行该方法。

  • 错误:getMore命令失败:{“operationtime”:Timestamp(1547144095,335),“ok”:0,“errmsg”:“未能在$convert中解析objectId”,没有onError值:>解析到OID的字符串长度无效,预期为24,但找到0“,”code“:241,”codename“:”conversionfailure“,”$clustertime“:{”clu

  • 我对电话号码格式有疑问。如果我试图读取Android设备上的通话记录,我得到两种格式的号码。国际和短版本。我一直认为这种行为会受到联系人存储在联系人中的格式的影响。但我已经不确定了。 是否有可能手机未从提供商处获得完整格式?这样它就不能给我了?如果是这种情况,是否存在某些规范,在哪些条件下提供商不提供完整格式?例如,如果呼叫在同一国家/地区内? 或者从提供商那里,手机一直都是全格式的,Androi

  • 问题内容: 我想在PHP CodeIgniter中记录错误。如何启用错误记录? 我有一些疑问: 记录错误的所有步骤是什么? 如何创建错误日志文件? 如何将错误消息推送到日志文件中(无论何时发生错误)? 您如何通过电子邮件将该错误发送到电子邮件地址? 问题答案: CodeIgniter内置了一些错误记录功能。 使您的 / application / logs 文件夹可写 在 /application

  • 我试图使用结构化流方法,使用基于DataFrame/DataSet API的Spark-Streaming来加载来自Kafka的数据流。 我使用: 火花2.10 Kafka0.10 SPARK-SQL-KAFKA-0-10