Scala\u Spark\u DataFrameReader\u csv的文档表明,Spark可以记录在读取时检测到的格式错误的行。csv文件
-如何记录格式错误的行
-是否可以获取包含格式错误行的val或var?
链接文档中的选项是:maxMalformedLogPerPartition(默认值10):设置Spark将为每个分区记录的最大错误行数。超过此数字的格式错误记录将被忽略
我在这里通过加载csv、从未损坏的记录中创建模式、添加损坏的记录列、使用新模式加载csv,然后查找损坏的记录来扩展clucar的答案。
from pyspark.sql.types import StructField, StringType
from pyspark.sql.functions import col
file_path = "/path/to/file"
mode = "PERMISSIVE"
schema = spark.read.options(mode=mode).csv(file_path).schema
schema = schema.add(StructField("_corrupt_record", StringType(), True))
df = spark.read.options(mode=mode).schema(schema).csv(file_path)
df.cache()
df.count()
df.filter(col("_corrupt_record").isNotNull()).show()
如果你正在使用火花2.3检查_corrupt_error特殊列...根据几个火花讨论“它应该工作”,所以在读取过滤器后,那些非空的科勒-应该有你的错误...你也可以检查input_file_name()sql func
如果未使用低于2.3版的版本,则应实现自定义读取、记录解决方案,因为根据我的测试,csv数据源不存在\u corrupt\u错误。。。
基于此databricks示例,在读取文件时,需要将“\u corrupt\u record”列显式添加到架构定义中。在pyspark 2.4.4中,类似的内容对我很有用:
from pyspark.sql.types import *
my_schema = StructType([
StructField("field1", StringType(), True),
...
StructField("_corrupt_record", StringType(), True)
])
my_data = spark.read.format("csv")\
.option("path", "/path/to/file.csv")\
.schema(my_schema)
.load()
my_data.count() # force reading the csv
corrupt_lines = my_data.filter("_corrupt_record is not NULL")
corrupt_lines.take(5)
问题内容: 目前,我正在开发一项功能,该功能涉及解析从另一产品收到的XML。我决定对一些实际的客户数据进行一些测试,看起来其他产品正在允许来自用户的输入被认为是无效的。无论如何,我仍然必须尝试找出一种解析它的方法。我们正在使用,但输入出现错误,如下所示。 如你所知,说明中包含似乎是无效标签的内容。现在,此描述标签被称为是叶子标签,并且其中不应包含任何嵌套标签。无论如何,这仍然是一个问题,并且会在
通过@HystrixCommand注释,可以配置一个回退方法,在方法失败的情况下运行该方法。
错误:getMore命令失败:{“operationtime”:Timestamp(1547144095,335),“ok”:0,“errmsg”:“未能在$convert中解析objectId”,没有onError值:>解析到OID的字符串长度无效,预期为24,但找到0“,”code“:241,”codename“:”conversionfailure“,”$clustertime“:{”clu
我对电话号码格式有疑问。如果我试图读取Android设备上的通话记录,我得到两种格式的号码。国际和短版本。我一直认为这种行为会受到联系人存储在联系人中的格式的影响。但我已经不确定了。 是否有可能手机未从提供商处获得完整格式?这样它就不能给我了?如果是这种情况,是否存在某些规范,在哪些条件下提供商不提供完整格式?例如,如果呼叫在同一国家/地区内? 或者从提供商那里,手机一直都是全格式的,Androi
问题内容: 我想在PHP CodeIgniter中记录错误。如何启用错误记录? 我有一些疑问: 记录错误的所有步骤是什么? 如何创建错误日志文件? 如何将错误消息推送到日志文件中(无论何时发生错误)? 您如何通过电子邮件将该错误发送到电子邮件地址? 问题答案: CodeIgniter内置了一些错误记录功能。 使您的 / application / logs 文件夹可写 在 /application
我试图使用结构化流方法,使用基于DataFrame/DataSet API的Spark-Streaming来加载来自Kafka的数据流。 我使用: 火花2.10 Kafka0.10 SPARK-SQL-KAFKA-0-10