当前位置: 首页 > 知识库问答 >
问题:

我可以使用spack-csv将表示为字符串的CSV读取到Apache Spark中吗

臧增
2023-03-14

我知道如何使用spark csv将csv文件读入spark(https://github.com/databricks/spark-csv),但我已经将csv文件表示为字符串,并希望将此字符串直接转换为dataframe。这可能吗?

共有3个答案

景令秋
2023-03-14

公认的答案在spark 2.2.0中对我不起作用,但可以通过csvData找到我需要的东西。线toList

val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString

val csvList = streamString.lines.toList

spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvList.toDS())
  .as[SomeCaseClass]  
鞠建安
2023-03-14

您可以使用,例如scala csv,将字符串解析为csv:

val myCSVdata:Array[列表[字符串]]=myCSVString。拆分(“\n”)。平面图(CSVParser.parseLine(_))

在这里,您可以进行更多的处理、数据清理、验证每一行是否解析良好以及是否具有相同数量的字段等。。。

然后,您可以将其作为记录的RDD:

val myCSVRDD: RDD[List[String]]=parkContext.parallelize(msCSVdata)

在这里,您可以将字符串列表转换为case类,以更好地反映csv数据的字段。在本例中,您应该从Person的创作中获得一些灵感:

https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

我省略了这一步。

然后可以转换为数据帧:

导入spark。隐含。\umyCSVDataframe=myCSVRDD。toDF()

仉伟兆
2023-03-14

更新:从Spark 2.2开始。最后,有一种正确的方法可以使用Dataset来完成。

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
  """
    |id, date, timedump
    |1, "2014/01/01 23:00:01",1499959917383
    |2, "2014/11/31 12:40:32",1198138008843
  """.stripMargin.lines.toList).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()

旧火花版本

实际上你可以,尽管它使用的是库的内部结构,并且没有被广泛宣传。只需创建和使用您自己的CsvParser实例。下面的示例适用于我的Spark 1.6.0和Spark-csv_2.10-1.4.0

    import com.databricks.spark.csv.CsvParser

val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
  .withUseHeader(true)
  .withInferSchema(true)


val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
 类似资料:
  • 问题内容: 我有一个包含字母数字键的数据框,我想另存为csv并在以后读取。由于种种原因,我需要以字符串格式显式读取此键列,所以我使用的键严格地是数字的,甚至更糟,例如:1234E5,Pandas会将其解释为浮点数。这显然使密钥完全无用。 问题是,当我为数据框或其中的任何列指定字符串dtype时,我只会得到垃圾回收。我在这里有一些示例代码: 数据框如下所示: 然后我像这样阅读: 结果是: 这是我的计

  • 场景是:EventHub- 文件格式:CSV(带引号、管道分隔和自定义架构) 我正在尝试读取来自eventhub的CSV字符串。Spark成功地使用正确的模式创建了数据框,但在每条消息之后,数据框最终都是空的。 我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切都很顺利,但当数据来自字符串时,一切都失败了。 所以我找到了一些链接来帮助我,但没有一个工作: can-i-read-a-cs

  • 我正在尝试使用spack-csv从spack-shell中的aws s3读取csv。 下面是我所做的步骤。使用下面的命令启动spack-shell 箱子/火花壳——包装com。数据块:spark-csv\u 2.10:1.2.0 在shell中,执行以下scala代码 获取以下错误 我在这里错过了什么?请注意,我可以使用 同样的scala代码在databricks笔记本中也可以正常工作 在spar

  • 问题内容: 我正在尝试使用csv文件读取文件,但某些字段是包含逗号的字符串。字符串用引号引起来,但是numpy不能将引号识别为定义了单个字符串。例如,使用“ t.csv”中的数据: 编码 产生错误: ValueError:检测到一些错误!第2行(获得4列而不是3列) 我正在寻找的数据结构是: 查看文档,我看不到任何解决方案。有没有办法用numpy做到这一点,或者我只需要使用模块读入数据,然后将其转

  • 我正在通过Spark使用以下命令读取csv文件。 我需要创建一个Spark DataFrame。 我使用以下方法将此rdd转换为spark df: 但是在将rdd转换为df时,我需要指定df的模式。我试着这样做:(我只有两列文件和消息) 然而,我得到了一个错误:java。lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供1个值。 我还尝试使用以

  • 问题内容: 我想在使用Python的模块在Python数据结构和csv表示形式之间来回切换时区分和空字符串。 我的问题是,当我运行时: 我得到以下输出: 当然,我可以使用和区分和清空字符串,例如: 但这会部分破坏我对模块的兴趣(在C中实现快速反序列化/串行化,尤其是在处理大型列表时)。 是否有一个或参数,并能够使他们之间的区别,并在此用例? 如果不是,是否有兴趣实施补丁以实现这种来回交互?(可能是