"[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]"
您不能使用Spark CSV reader直接读取这样的列,因为无法区分作为json字符串一部分的逗号和列分隔符,而且出于同样的原因,您不能使用“
作为引号字符。
要解决这个问题,您应该将CSV文件作为文本文件读取,将String
行拆分为seq[String]
,然后从此seq[String]
重新创建列
例如,如果您有一个名为file.csv
的文件,其内容如下:
column1, column2, column3, column4
1, "value1", "[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]", 1.1
2, "value2", "[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]", 1.2
import org.apache.spark.sql.functions.col
import sparkSession.implicits._
val finalDataframe = sparkSession.read.text("file.csv")
.filter(col("value").notEqual("column1, column2, column3, column4"))
.as[String]
.map(splitRow)
.withColumn("column1", col("value").getItem(0).cast("int"))
.withColumn("column2", col("value").getItem(1).cast("string"))
.withColumn("column3", col("value").getItem(2).cast("string"))
.withColumn("column4", col("value").getItem(3).cast("float"))
.drop("value")
其中splitrow
函数接受字符串
作为输入并返回seq[String]
,每个字符串
按顺序表示一个列值。
此方法的实现可以是:
def splitRow(row: String): Seq[String] = row.foldLeft(Accumulator())((acc, char) => char match {
case ',' if !acc.openBracket => acc.copy(result = acc.buffer +: acc.result, buffer = "", previousIsHyphen = false)
case '[' if acc.previousIsHyphen => acc.copy(buffer = "[", openBracket = true, previousIsHyphen = false)
case '"' if !acc.openBracket => acc.copy(previousIsHyphen = true)
case '"' if acc.buffer.last == ']' => acc.copy(openBracket = false)
case _ => acc.copy(buffer = acc.buffer + char, previousIsHyphen = false)
}).flush()
使用以下case类作为累加器
:
case class Accumulator(
result: Seq[String] = Nil,
openBracket: Boolean = false,
buffer: String = "",
previousIsHyphen: Boolean = false
) {
def flush(): Seq[String] = (buffer +: result).reverse
}
+-------+-------+------------------------------------------------------+-------+
|column1|column2|column3 |column4|
+-------+-------+------------------------------------------------------+-------+
|1 | value1|[{"code": "100", "name": "CLS1", "type": "PRIMARY"}] |1.1 |
|2 | value2|[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]|1.2 |
+-------+-------+------------------------------------------------------+-------+
我是Python新手,一直在关注我在网上找到的关于解决中国邮递员问题的教程。 我上传了所需的CSV文件,但每当我试图定义用于打印的节点位置数据结构时,它总是说“KeyError:'X”“”,这是我的CSV文件的标题之一 一位朋友告诉我,可能的问题是CSV文件中有空格,但我不确定如何解决这个问题。 我试过使用在线教程中的文件,效果很好,所以我不确定我做错了什么。 是打印(df)运行时得到的输出类型
> 我运行spark shell,如下所示: spark-shell--jars.\spark-csv2.11-1.4.0.jar;.\commons-csv-1.2.jar(我不能直接下载这些依赖项,这就是我使用--jars的原因) 使用以下命令读取csv文件: 在执行第一个建议的解决方案后:
我试图读取CSV文件,但它抛出了一个错误。我无法理解我的语法有什么问题,或者我是否需要向我的read_csv添加更多属性。 我试了一下这个解决办法 UnicodeDecodeError:“utf-8”编解码器无法解码位置21中的字节0x96:起始字节也无效。但它不起作用 [错误] UnicodeDecodeError回溯(最近一次调用)pandas/_libs/解析器。大熊猫中的pyx_图书馆。解
我在apache Spark中读取本地文件时出错。scala>val f=sc.textfile(“/home/cloudera/downloads/sample.txt”)
我正在尝试将压缩的csv文件(.bz2)读取为数据帧。我的代码如下 当我在IDE中尝试时,这是可行的。我可以读取数据并对其进行处理,但当我尝试使用maven构建数据并在命令行上运行它时,会出现以下错误 我不确定我是否在这里错过了什么。读取csv文件是否有一些依赖项?根据留档,Spark 2. x. x内置了对此的支持。
我得到了一个CSV文件和一个头文件,它必须通过Spark(2.0.0和Scala2.11.8)作为数据frame读取。 是否有任何方法可以使用spark代码仅从CSV头中转义特殊字符?