当前位置: 首页 > 知识库问答 >
问题:

在scala数据帧中合并映射

盛嘉
2023-03-14

我有一个列为col1、col2、col3的数据帧。col3是下面定义的映射[String,String]

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

我按col1、col2进行分组,并使用collect_list进行聚合,以获得映射数组并存储在col4中。

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

然而,我希望将col4作为一个单独的映射,并将所有映射合并。目前我有:

[[a->a1,b->b1],[c->c1]]

预期输出

[a->a1,b->b1,c->c1]

使用udf是否理想?

感谢您的帮助。谢谢

共有2个答案

宦烈
2023-03-14
匿名用户

没有UDF你也能实现。让我们创建您的数据框架:

val df = Seq(Seq(Map("a" -> "a1", "b" -> "b1"), Map("c" -> "c1", "d" -> "d1"))).toDF()
df.show(false)
df.printSchema()

输出:

+----------------------------------------+
|value                                   |
+----------------------------------------+
|[[a -> a1, b -> b1], [c -> c1, d -> d1]]|
+----------------------------------------+

root
 |-- value: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

如果数组包含2个元素,只需使用< code>map_concat:

df.select(map_concat('value.getItem(0), 'value.getItem(1))).show(false)

或者这样(我不知道如何动态地从0循环到“值数组类型”列大小,这可能是最短的解决方案)

df.select(map_concat((for {i <- 0 to 1} yield 'value.getItem(i)): _*)).show(false)

否则,如果数组包含多个映射,并且大小未知,则可以尝试以下方法:

  val df2 = df.map(s => {
    val list = s.getList[Map[String, String]](0)
    var map = Map[String, String]()
    for (i <- 0 to list.size() - 1) {
      map = map ++ list.get(i)
    }
    map
  })

  df2.show(false)
  df2.printSchema()

输出:

+------------------------------------+
|value                               |
+------------------------------------+
|[a -> a1, b -> b1, c -> c1, d -> d1]|
+------------------------------------+

root
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

戚弘和
2023-03-14

您可以使用聚合和map_concat:

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

使用<code>map_concat

注意:当前在Spark 2.4.5上实现的map_concat允许相同密钥共存。这很可能是一个bug,因为根据官方文档,这不是预期的行为。请注意这一点。

如果你想避免这种情况,你也可以去UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)

更新(2022-08-27)

  1. 在 Spark 3.3.0 中,上述代码不起作用,并引发以下异常:
AnalysisException: cannot resolve 'aggregate(`data`, map(), lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))' due to data type mismatch: argument 3 requires map<null,null> type, however, 'lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable())' is of map<string,string> type.;
Project [id#110, aggregate(data#119, map(), lambdafunction(map_concat(cast(lambda acc#122 as map<string,string>), lambda i#123), lambda acc#122, lambda i#123, false), lambdafunction(lambda id#124, lambda id#124, false)) AS aggregate(data, map(), lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))#125]
+- Aggregate [id#110], [id#110, collect_list(data#111, 0, 0) AS data#119]
   +- Project [_1#105 AS id#110, _2#106 AS data#111]
      +- LocalRelation [_1#105, _2#106]

似乎map()被初始化为map

要解决此问题,只需将 map() 转换为地图

以下是更新的代码:

val mergeExpr = expr("aggregate(data, cast(map() as map<string,
string>), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr)
  .show(false)
Caused by: RuntimeException: Duplicate map key k5 was found, please check the input data. If you want to remove the duplicated keys, you can set spark.sql.mapKeyDedupPolicy to LAST_WIN so that the key inserted at last takes precedence.

 类似资料:
  • 我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?

  • 我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确

  • 这是我的密码: 我想知道如何将df3绑定到单个数据帧中作为"NA"s? 我在r_blogger上找到了一篇关于将向量或长度不等的数据帧组合成一个数据帧的文章。http://www.r-bloggers.com/r-combining-vectors-or-data-frames-of-unequal-length-into-one-data-frame/ 但是我从数据中得到的数据框,其中一些是空的

  • 我有不同的数据帧,需要根据日期列将它们合并在一起。如果我只有两个数据帧,我可以使用,要使用三个数据帧,我可以使用,但是使用多个数据帧会变得非常复杂和不可读。 所有数据帧都有一个公共列-,但它们的行数和列数都不相同,我只需要其中每个日期对每个数据帧都是公共的行。 所以,我试图编写一个递归函数,返回一个包含所有数据的数据帧,但它不起作用。那么我应该如何合并多个数据帧呢? 我尝试了不同的方法,得到了一些

  • 问题内容: 我有2个带有通用列/键(x,y)的熊猫数据框df1和df2。 我想对键(x,y)进行“(df1&not df2)”合并,这意味着我希望我的代码返回仅包含df1中而不包含df2中包含(x,y)行的数据框。 SAS具有等效功能 谁能优雅地在熊猫中复制相同的功能?如果我们可以在merge()中指定how =“ left-right”,那就太好了。 问题答案: 我刚刚升级到10天前发布的版本0

  • RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误: