当前位置: 首页 > 知识库问答 >
问题:

如何在spark中加载到数据帧时修剪字段?

爱炯
2023-03-14

我们最近收到了一个要摄取的文件,该文件是PSV格式的,但是,所有的字段都用额外的字符$~$填充在左右两边,所以整个PSV如下所示:

$ ~ $ field 1 $ ~ $ | $ ~ $ field 2 $ ~ $ | $ ~ $ field 3 $ ~ $

$~$Data1$~$|$~$Data 2$~$$|$$~$Data3$~$

$~$数据4 $ ~ $ | $ ~ $数据5 $ ~ $ | $ ~ $数据6$~$

$~$数据7 $ ~ $ | $ ~ $数据8 $ ~ $ | $ ~ $数据9$~$

$~$Data10$~$|$~$Data 11$~$$|$$~$数据12$~$。。。。。

文件中有1亿行。

什么是最好的方法来修整这些衬垫,使它成为一个标准的PSV?

非常感谢,任何建议/分享都在这里不胜感激。

更新:

数据从SFTP接收,并由IT数据支持(Unix Admin)上传到Hadoop,我们只能访问Hadoop集群,但如果这对数据支持来说是一项简单的工作,也许我可以说服他们进行预处理。谢谢

共有3个答案

朱炜
2023-03-14

使用regexp_replace和foldLeft更新所有列。看看这个

scala> val df = Seq(("$~$Data1$~$","$~$Data2$~$","$~$Data3$~$"), ("$~$Data4$~$","$~$Data5$~$","$~$Data6$~$"), ("$~$Data7$~$","$~$Data8$~$","$~$Data9$~$"),("$~$Data10$~$","$~$Data11$~$","$~$Data12$~$")).toDF("Field1","field2","field3")
df: org.apache.spark.sql.DataFrame = [Field1: string, field2: string ... 1 more field]

scala> df.show(false)
+------------+------------+------------+
|Field1      |field2      |field3      |
+------------+------------+------------+
|$~$Data1$~$ |$~$Data2$~$ |$~$Data3$~$ |
|$~$Data4$~$ |$~$Data5$~$ |$~$Data6$~$ |
|$~$Data7$~$ |$~$Data8$~$ |$~$Data9$~$ |
|$~$Data10$~$|$~$Data11$~$|$~$Data12$~$|
+------------+------------+------------+


scala> val df2 = df.columns.foldLeft(df) { (acc,x) => acc.withColumn(x,regexp_replace(col(x),"""^\$~\$|\$~\$$""","")) }
df2: org.apache.spark.sql.DataFrame = [Field1: string, field2: string ... 1 more field]

scala> df2.show(false)
+------+------+------+
|Field1|field2|field3|
+------+------+------+
|Data1 |Data2 |Data3 |
|Data4 |Data5 |Data6 |
|Data7 |Data8 |Data9 |
|Data10|Data11|Data12|
+------+------+------+


scala>
祁坚壁
2023-03-14

这是一个纯Spark解决方案。可能有性能更好的解决方案。

var df = spark.read.option("delimiter", "|").csv(filePath)
val replace = (value: String, find: String, replace: String) => value.replace(find, replace)
val replaceUdf = udf(replace)
df.select(
       df.columns.map(c => replaceUdf(col(c), lit("$~$"), lit("")).alias(c)): _*)
  .show

更新:在2.3.0中,您不能将$~$用作引号选项,也不能将$~$|$~$作为分隔符

阙项禹
2023-03-14

tr可能是更快的解决方案。请注意,您可以通过管道传输任何字符串,因此在本例中,我cating磁盘上的文件,但这也可以是来自sftp的文件流。

~/Desktop/test $ cat data.txt
$~$Field1$~$|$~$Field2$~$|$~$Field3$~$

$~$Data1$~$|$~$Data2$~$|$~$Data3$~$

$~$Data4$~$|$~$Data5$~$|$~$Data6$~$

$~$Data7$~$|$~$Data8$~$|$~$Data9$~$

# the '>' will open a new file for writing

~/Desktop/test $ cat data.txt | tr -d \$~\$ > output.psv

# see the results here
~/Desktop/test $ cat output.psv 
Field1|Field2|Field3

Data1|Data2|Data3

Data4|Data5|Data6

Data7|Data8|Data9

示例:https://shapeshed.com/unix-tr/#what-是unix中的tr命令

 类似资料:
  • 我正在实施一个项目,其中MySql数据被导入到hdfs使用sqoop。它有将近30张桌子。我通过推断模式和注册为临时表来读取每个表作为数据帧。我做这件事有几个问题...1.假设df1到df10的表需要实现几个连接。在MySQL中,查询将是而不是使用是否有其他连接所有数据帧有效地基于条件...

  • 此外,特别是对于它得到的字段(检查case class[2]) 如果我将case类[2]中的所有字段都定义为String类型,那么一切都很好,但这不是我想要的。有没有一个简单的方法做这件事[3]? 参考文献 [3]我已经找到了这样做的方法,首先在DataFrame级别上定义列,然后将事情转换为Dataset(比如here或here或here),但我几乎可以肯定,这不是应该做的事情。我也很确定编码器

  • 通常,人们会在打印输出(数据帧)时询问堆栈溢出问题。如果有一种方法可以将数据帧数据快速加载到对象中,那么这是很方便的。 从数据帧字符串(可能格式正确,也可能格式不正确)加载数据帧的最有建议的方法是什么? 如果要将以下字符串作为数据帧加载,您会怎么做? 此类型与您在文件中找到的更相似。 注意:以下两个链接不涉及示例-1中提出的具体情况。我认为我的问题不是重复的原因是,我认为不能使用已经发布在这些链接

  • 我需要将多个列附加到现有的spark dataframe,其中列名称在列表中给定,假设新列的值是常量,例如给定的输入列和dataframe是 并且在附加两列后,假设 col1 的常量值为“val1”,col2 的常量值为“val2”,则输出数据帧应为 我已经编写了一个函数来追加列 有没有更好的方式,更具功能性的方式去做。 谢啦

  • 我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。 我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。

  • 当我加载并运行下面的代码时,我会得到一个UnicodeDecodeError。如何解决这个问题? 我在我的文件夹中有一个CSV文件,但这种类型的错误即将出现,它不会显示输出中的头部列表。我使用Jupyter笔记本来运行代码以及编程。