当前位置: 首页 > 知识库问答 >
问题:

通过Spark读取csv文件时出现问题

谢善
2023-03-14
"[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]"

共有1个答案

孙宏扬
2023-03-14

您不能使用Spark CSV reader直接读取这样的列,因为无法区分作为json字符串一部分的逗号和列分隔符,而且出于同样的原因,您不能使用作为引号字符。

要解决这个问题,您应该将CSV文件作为文本文件读取,将String拆分seq[String],然后从此seq[String]重新创建列

例如,如果您有一个名为file.csv的文件,其内容如下:

column1, column2, column3, column4
1, "value1", "[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]", 1.1
2, "value2", "[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]", 1.2
import org.apache.spark.sql.functions.col
import sparkSession.implicits._

val finalDataframe = sparkSession.read.text("file.csv")
  .filter(col("value").notEqual("column1, column2, column3, column4"))
  .as[String]
  .map(splitRow)
  .withColumn("column1", col("value").getItem(0).cast("int"))
  .withColumn("column2", col("value").getItem(1).cast("string"))
  .withColumn("column3", col("value").getItem(2).cast("string"))
  .withColumn("column4", col("value").getItem(3).cast("float"))
  .drop("value")

其中splitrow函数接受字符串作为输入并返回seq[String],每个字符串按顺序表示一个列值。

此方法的实现可以是:

def splitRow(row: String): Seq[String] = row.foldLeft(Accumulator())((acc, char) => char match {
  case ',' if !acc.openBracket => acc.copy(result = acc.buffer +: acc.result, buffer = "", previousIsHyphen = false)
  case '[' if acc.previousIsHyphen => acc.copy(buffer = "[", openBracket = true, previousIsHyphen = false)
  case '"' if !acc.openBracket => acc.copy(previousIsHyphen = true)
  case '"' if acc.buffer.last == ']' => acc.copy(openBracket = false)
  case _ => acc.copy(buffer = acc.buffer + char, previousIsHyphen = false)
}).flush()

使用以下case类作为累加器:

case class Accumulator(
  result: Seq[String] = Nil, 
  openBracket: Boolean = false, 
  buffer: String = "", 
  previousIsHyphen: Boolean = false
) {
  def flush(): Seq[String] = (buffer +: result).reverse
}
+-------+-------+------------------------------------------------------+-------+
|column1|column2|column3                                               |column4|
+-------+-------+------------------------------------------------------+-------+
|1      | value1|[{"code": "100", "name": "CLS1", "type": "PRIMARY"}]  |1.1    |
|2      | value2|[{"code": "200", "name": "CLS2", "type": "SECONDARY"}]|1.2    |
+-------+-------+------------------------------------------------------+-------+
 类似资料:
  • 我是Python新手,一直在关注我在网上找到的关于解决中国邮递员问题的教程。 我上传了所需的CSV文件,但每当我试图定义用于打印的节点位置数据结构时,它总是说“KeyError:'X”“”,这是我的CSV文件的标题之一 一位朋友告诉我,可能的问题是CSV文件中有空格,但我不确定如何解决这个问题。 我试过使用在线教程中的文件,效果很好,所以我不确定我做错了什么。 是打印(df)运行时得到的输出类型

  • > 我运行spark shell,如下所示: spark-shell--jars.\spark-csv2.11-1.4.0.jar;.\commons-csv-1.2.jar(我不能直接下载这些依赖项,这就是我使用--jars的原因) 使用以下命令读取csv文件: 在执行第一个建议的解决方案后:

  • 我试图读取CSV文件,但它抛出了一个错误。我无法理解我的语法有什么问题,或者我是否需要向我的read_csv添加更多属性。 我试了一下这个解决办法 UnicodeDecodeError:“utf-8”编解码器无法解码位置21中的字节0x96:起始字节也无效。但它不起作用 [错误] UnicodeDecodeError回溯(最近一次调用)pandas/_libs/解析器。大熊猫中的pyx_图书馆。解

  • 我在apache Spark中读取本地文件时出错。scala>val f=sc.textfile(“/home/cloudera/downloads/sample.txt”)

  • 我正在尝试将压缩的csv文件(.bz2)读取为数据帧。我的代码如下 当我在IDE中尝试时,这是可行的。我可以读取数据并对其进行处理,但当我尝试使用maven构建数据并在命令行上运行它时,会出现以下错误 我不确定我是否在这里错过了什么。读取csv文件是否有一些依赖项?根据留档,Spark 2. x. x内置了对此的支持。

  • 我得到了一个CSV文件和一个头文件,它必须通过Spark(2.0.0和Scala2.11.8)作为数据frame读取。 是否有任何方法可以使用spark代码仅从CSV头中转义特殊字符?