我知道如何使用spark csv将csv文件读入spark(https://github.com/databricks/spark-csv),但我已经将csv文件表示为字符串,并希望将此字符串直接转换为dataframe。这可能吗?
公认的答案在spark 2.2.0中对我不起作用,但可以通过csvData找到我需要的东西。线toList
val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString
val csvList = streamString.lines.toList
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvList.toDS())
.as[SomeCaseClass]
您可以使用,例如scala csv,将字符串解析为csv:
val myCSVdata:Array[列表[字符串]]=myCSVString。拆分(“\n”)。平面图(CSVParser.parseLine(_))
在这里,您可以进行更多的处理、数据清理、验证每一行是否解析良好以及是否具有相同数量的字段等。。。
然后,您可以将其作为记录的RDD:
val myCSVRDD: RDD[List[String]]=parkContext.parallelize(msCSVdata)
在这里,您可以将字符串列表转换为case类,以更好地反映csv数据的字段。在本例中,您应该从Person的创作中获得一些灵感:
https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
我省略了这一步。
然后可以转换为数据帧:
导入spark。隐含。\umyCSVDataframe=myCSVRDD。toDF()
更新:从Spark 2.2开始。最后,有一种正确的方法可以使用Dataset来完成。
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
旧火花版本
实际上你可以,尽管它使用的是库的内部结构,并且没有被广泛宣传。只需创建和使用您自己的CsvParser实例。下面的示例适用于我的Spark 1.6.0和Spark-csv_2.10-1.4.0
import com.databricks.spark.csv.CsvParser
val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
.withUseHeader(true)
.withInferSchema(true)
val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
问题内容: 我有一个包含字母数字键的数据框,我想另存为csv并在以后读取。由于种种原因,我需要以字符串格式显式读取此键列,所以我使用的键严格地是数字的,甚至更糟,例如:1234E5,Pandas会将其解释为浮点数。这显然使密钥完全无用。 问题是,当我为数据框或其中的任何列指定字符串dtype时,我只会得到垃圾回收。我在这里有一些示例代码: 数据框如下所示: 然后我像这样阅读: 结果是: 这是我的计
场景是:EventHub- 文件格式:CSV(带引号、管道分隔和自定义架构) 我正在尝试读取来自eventhub的CSV字符串。Spark成功地使用正确的模式创建了数据框,但在每条消息之后,数据框最终都是空的。 我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切都很顺利,但当数据来自字符串时,一切都失败了。 所以我找到了一些链接来帮助我,但没有一个工作: can-i-read-a-cs
我正在尝试使用spack-csv从spack-shell中的aws s3读取csv。 下面是我所做的步骤。使用下面的命令启动spack-shell 箱子/火花壳——包装com。数据块:spark-csv\u 2.10:1.2.0 在shell中,执行以下scala代码 获取以下错误 我在这里错过了什么?请注意,我可以使用 同样的scala代码在databricks笔记本中也可以正常工作 在spar
问题内容: 我正在尝试使用csv文件读取文件,但某些字段是包含逗号的字符串。字符串用引号引起来,但是numpy不能将引号识别为定义了单个字符串。例如,使用“ t.csv”中的数据: 编码 产生错误: ValueError:检测到一些错误!第2行(获得4列而不是3列) 我正在寻找的数据结构是: 查看文档,我看不到任何解决方案。有没有办法用numpy做到这一点,或者我只需要使用模块读入数据,然后将其转
我正在通过Spark使用以下命令读取csv文件。 我需要创建一个Spark DataFrame。 我使用以下方法将此rdd转换为spark df: 但是在将rdd转换为df时,我需要指定df的模式。我试着这样做:(我只有两列文件和消息) 然而,我得到了一个错误:java。lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供1个值。 我还尝试使用以
问题内容: 我想在使用Python的模块在Python数据结构和csv表示形式之间来回切换时区分和空字符串。 我的问题是,当我运行时: 我得到以下输出: 当然,我可以使用和区分和清空字符串,例如: 但这会部分破坏我对模块的兴趣(在C中实现快速反序列化/串行化,尤其是在处理大型列表时)。 是否有一个或参数,并能够使他们之间的区别,并在此用例? 如果不是,是否有兴趣实施补丁以实现这种来回交互?(可能是