当前位置: 首页 > 知识库问答 >
问题:

使用Spark读取CSV

毋宏茂
2023-03-14

我正在通过Spark使用以下命令读取csv文件。

rdd=sc.textFile("emails.csv").map(lambda line: line.split(","))

我需要创建一个Spark DataFrame。

我使用以下方法将此rdd转换为spark df:

dataframe=rdd.toDF()

但是在将rdd转换为df时,我需要指定df的模式。我试着这样做:(我只有两列文件和消息)

from pyspark import Row

email_schema=Row('file','message')

email_rdd=rdd.map(lambda r: email_schema(*r))

dataframe=sqlContext.createDataFrame(email_rdd)

然而,我得到了一个错误:java。lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供1个值。

我还尝试使用以下方法读取我的csv文件:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

我收到错误:TypeError:“list”对象不可调用

我尝试使用熊猫将我的csv文件读取到熊猫数据帧中,然后将其转换为触发DataFrame,但我的文件太大了。

我还补充道:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3

并使用以下命令读取我的文件:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv')

我得到了一个错误:java。io。IOException:(startline 1)在封装令牌完成之前达到EOF

我已经浏览了其他几个相关的线程并尝试了如上所述。有人能解释一下我哪里出错了吗?

[在MacOSX上使用Python 2.7、Spark 1.6.2]

已编辑:

前3行如下。我只需要提取电子邮件的内容。我该怎么做?

1 allen-p/_sent_mail/1."Message-ID:

这是我们的预测”

2 allen-p/\u发送邮件/10。“邮件ID:

去参加一个商务会议会给旅行带来乐趣。尤其是当你必须准备演讲的时候。我建议在这里举行商业计划会议,然后在没有任何正式商业会议的情况下旅行。我甚至会尝试就旅行是理想的还是必要的获取一些诚实的意见。

至于商务会议,我认为尝试激发不同群体之间关于什么有效、什么无效的讨论会更有成效。太多时候,演讲者在发言,其他人安静地等待轮到他们。如果以圆桌讨论的形式举行会议可能会更好。

我的建议是去奥斯汀。打高尔夫球,租一艘滑雪艇和摩托艇。去某个地方飞行需要太多时间。“”

3 allen-p/\u sent\u mail/100。“邮件ID:

测试成功。加油!!!“”

共有2个答案

呼延庆
2023-03-14

如果您有一个大文件,为什么不使用大块的熊猫数据帧,而不是一次加载所有文件,比如:

import pandas as pd
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000)

for i,chunk in enumerate(df1):
    if i==0:
        df_spark = sqlContext.createDataFrame(chunk)
    else:
        df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk))

df\U spark将是您所需的spark数据帧。这是低效的,但它会起作用。有关实现相同功能的其他方法,可以参考此问题的答案

另一种可能的方法是使用rdd的inferSchema方法,但您需要在csv文件中有列名才能工作,请参阅此。因此,您可以执行以下操作:

srdd = inferSchema(rdd)
email_rdd=rdd.map(lambda r: srdd(*r))

dataframe=sqlContext.createDataFrame(email_rdd)
彭胡媚
2023-03-14

如果RDD适合内存,则:

rdd.toPandas().to_csv('emails.csv')

如果不是,请将spark csv用于您的spark版本:

rdd.write.format('com.databricks.spark.csv').save('emails.csv')

在您上面的示例中:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

难道你不想:

rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1]))
 类似资料:
  • 我正在尝试使用spack-csv从spack-shell中的aws s3读取csv。 下面是我所做的步骤。使用下面的命令启动spack-shell 箱子/火花壳——包装com。数据块:spark-csv\u 2.10:1.2.0 在shell中,执行以下scala代码 获取以下错误 我在这里错过了什么?请注意,我可以使用 同样的scala代码在databricks笔记本中也可以正常工作 在spar

  • 我在尝试使用Spark简单读取CSV文件时遇到了这个问题。在这样的操作之后,我想确保: 数据类型是正确的(使用提供的模式) 根据提供的架构,标头是正确的 这是我使用的代码,并且有问题: 类型为产品类型,即案例类。这是可行的,但它不会检查列名是否正确,因此我可以提供另一个文件,只要数据类型正确,就不会发生错误,而且我不知道用户提供了错误的文件,但由于某种程度上的巧合,正确的数据类型具有正确的顺序。

  • 我正在尝试使用spark阅读Kafka,但我想我会遇到一些图书馆相关的问题。 线程“main”org.apache.spark.sql.AnalysisException中出现异常:找不到数据源:Kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookup

  • 问题内容: 我在通过火花流从天蓝色斑点读取数据时遇到问题 上面的代码适用于HDFS,但无法从Azure blob读取文件 上面是在azure UI中显示的路径,但是这行不通,我是否丢失了某些内容,以及如何访问它。 我知道Eventhub是流数据的理想选择,但是我目前的情况要求使用存储而不是队列 问题答案: 为了从Blob存储中读取数据,需要完成两件事。首先,您需要告诉Spark在基础Hadoop配

  • 问题内容: 嗨,我有很多需要分类的图像(下百万)。我正在使用Spark,并设法以大RDD格式读取所有图像。 但是,我真的很困惑如何处理图像的unicode表示。 这是一个图像/文件的示例: 仔细看,实际上有些字符看起来像元数据 我以前的经验是使用包scipy和相关功能(例如“ imread”),并且输入通常是文件名。现在,我真的迷失了那些unicode的含义,以及如何将其转换为我熟悉的格式。 谁能

  • 我在apache Spark中读取本地文件时出错。scala>val f=sc.textfile(“/home/cloudera/downloads/sample.txt”)