当前位置: 首页 > 知识库问答 >
问题:

压缩和分解Spark SQL数据帧中的多列

冷浩瀚
2023-03-14

我有以下结构的数据帧:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

我想写一个UDF,那个

    < li >压缩DF中每列第I个位置的元素 < li >分解每个压缩元组的DF

生成的列应如下所示:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

目前,我正在对UDF使用多调用(每个列名一个,在运行时之前收集列名列表),如下所示:

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

由于dataframe可能有数百列,因此这种对< code>withColumn的迭代调用似乎需要很长时间。

问题:这是否可能与一个UDF和一个<code>DF.withColumn(…)

重要提示:UDF应该压缩动态数量的列(在运行时读取)。

共有2个答案

西门良才
2023-03-14

如果您知道并确定您的数组中的值的数量下面可以是一个更简单的解决方案

select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table
袁致远
2023-03-14

使用采用可变数量的列作为输入的 UDF。这可以通过数组数组来完成(假设类型相同)。由于您有一个数组数组,因此可以使用转置,这将获得与将列表压缩在一起相同的结果。然后,生成的数组可以分解。

val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
  cols.transpose
})

val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))

请注意,在Spark 2.4中,可以使用< code>arrays_zip来代替< code>UDF:

val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))
 类似资料:
  • 问题内容: 我正在使用以下代码来压缩和解压缩字符串数据,但是我面临的问题是,它很容易被压缩而不会出错,但是decompress方法会引发以下错误。 线程“主”中的异常java.io.IOException:不是GZIP格式 仍然不知道如何解决此问题!!! 问题答案: 这是因为 发送您可以从中获得的,并在其中使用它来构建您的。以下是需要在代码中进行的更改。

  • 问题内容: 有人可以向我解释zlib库在Nodejs中如何工作吗? 我对Node.js很陌生,还不确定如何使用缓冲区和流。 我的简单情况是一个字符串变量,我想将字符串压缩或解压缩(压缩或膨胀,gzip或gunzip等)到另一个字符串。 即(我希望它如何工作) 感谢您的帮助:) 问题答案: 更新 :没意识到在节点0.5中有一个新的内置“ zlib”模块。我在下面的答案是针对第三方node- zlib

  • 问题内容: 我在想办法也无法在Java中解压缩String时遇到麻烦。这是我要学习的基本Java类,因此只需要基本命令,没有什么花哨的地方。它能够输入的目标 在命令提示符下,它将打印(就像它压缩参数字符串一样)。 另一个目标是输入 它将打印(就像它将解压缩参数String一样)。减压是我遇到的问题。这是我的代码,非常感谢我得到的任何帮助。 问题答案: Quick Code for you..

  • 问题内容: 我知道这是一项容易的任务,但是更改代码后它停止工作,并且无法恢复!我实际上使用了两个函数来进行压缩和解压缩,尽管实际上它是“ jar”和“ unjar”,但这并没有太大的区别 任何帮助/建议吗? 创建JarFile时发生错误: 问题答案: 我不知道这是否是您的问题,但是通常最好在完成写入后关闭每个zip条目。 请参阅。 在显示的代码中,不会关闭邮政编码中的最后一个条目。您也不会显示关闭

  • 主要内容:1. 压缩和解压缩介绍,2. 启用压缩,3. 启用解压缩,4. 发送压缩文件本节介绍如何配置响应的压缩或解压缩以及发送压缩文件。 在这篇文章中,涉及内容如下 - 压缩和解压缩介绍 启用压缩 启用解压缩 发送压缩文件 1. 压缩和解压缩介绍 压缩响应通常会显着减少传输数据的大小。 然而,由于压缩在运行时发生,它还可以增加相当大的处理开销,这会对性能产生负面影响 在向客户端发送响应之前,NGINX会执行压缩,但不会“压缩”已压缩的响应(例如,由代理的服务器)。 2. 启用压缩

  • 我的GCP云存储桶中有很多.tar文件。每个.tar文件都有多个图层。我想使用GCP数据流解压缩这些.tar文件,并将它们放回另一个GCP存储桶中。 我找到了Google提供的用于批量解压缩云存储文件的实用工具模板,但它不支持.tar文件扩展名。 也许我应该在上传到云端之前尝试解压文件,或者Beam中是否存在其他内容? 每个tar文件未经压缩大约有15 TB。