场景是:EventHub-
文件格式:CSV(带引号、管道分隔和自定义架构)
我正在尝试读取来自eventhub的CSV字符串。Spark成功地使用正确的模式创建了数据框,但在每条消息之后,数据框最终都是空的。
我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切都很顺利,但当数据来自字符串时,一切都失败了。
所以我找到了一些链接来帮助我,但没有一个工作:
can-i-read-a-csv-represented-as-a-string-into-apache-spark-using-spark-csv?rq=1
Pyspark-将json字符串转换为数据帧
现在我有以下代码:
schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)
df.show()
CSV文件也可以这样做吗?
您可以通过|
分隔符上的行
和拆分
来构造这样的模式
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
x[1],\
x[2],\
x[3],\
x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])
for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')
schemaDF.printSchema()
root
|-- Decisao: string (nullable = true)
|-- PedidoID: string (nullable = true)
|-- De_LastUpdated: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)
pyspark新手,希望将csv文件读取到数据帧。似乎不能让人读。有什么帮助吗? ()中的Py4JJavaError回溯(最近一次调用)----
我尝试使用pyspark读取csv文件,并使用以下pyspark代码: 但是我得到了这个错误: 在collect(self)532中使用SCCallSiteSync(self._sc)作为CSS:533 sock_info=self._jdf.collectTopython()-->534返回列表(_load_from_socket(sock_info,BatchedSerializer()))5
我正在通过Spark使用以下命令读取csv文件。 我需要创建一个Spark DataFrame。 我使用以下方法将此rdd转换为spark df: 但是在将rdd转换为df时,我需要指定df的模式。我试着这样做:(我只有两列文件和消息) 然而,我得到了一个错误:java。lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供1个值。 我还尝试使用以
当我尝试导入带有火花的本地CSV时,默认情况下每个列都作为字符串读取。但是,我的列只包括整数和时间戳类型。更具体地说,CSV如下所示: 我已经找到了这个问题中应该有效的代码,但当我执行它时,所有条目都返回为NULL。 我使用以下内容来创建自定义架构: 然后使用以下命令读取CSV: 返回: 我是否错过了关键的一步?我怀疑Date列是问题的根源。注意:我在GoogleCollab中运行这个。
我不熟悉Python及其库pyspark,我需要进行一些POC,其中我需要读取来自上游的CSV文件。我收到的CSV文件没有任何分隔符,它将是一个基于位置的文件。我们可以在Oracle控制文件中执行此操作,在该文件中,我们可以定义每个列的位置,并检索我在pyspark中执行此操作所需的值。 我正在使用Apache Spark来处理我的Pyspark或python代码。 对于Ex。 TXT文件中的两行
我是python编程/数据科学家领域的新手。我使用Pycharm和MacOs。出于学习目的,我从Kaggle下载了一些CSV文件,我总是能够将它们加载到Pycharm中。但实际上我无法从我的工作环境中加载csv文件,令人惊讶的是,我可以使用R函数'fread'加载该文件,但我的目标是将其加载到python中。请在下面查找代码和错误: 代码: 错误: “/Users/oscargonzalez-ll