当前位置: 首页 > 知识库问答 >
问题:

Spark根据第一个数据集中的值更新第二个数据集中的值

夏振国
2023-03-14

我有两个火花数据集,其中一个列的帐户和键,键列在数组的格式[key1, key2, key3...]和另一个数据集的两个列的帐户和键值是在json.帐户,{key:值,键,值...}。我需要更新第二个数据集中的值,如果键出现在第一个数据集中。

   import org.apache.spark.sql.functions._
val df= sc.parallelize(Seq(("20180610114049", "id1","key1"),
  ("20180610114049", "id2","key2"),
  ("20180610114049", "id1","key1"),
  ("20180612114049", "id2","key1"),
  ("20180613114049", "id3","key2"),
  ("20180613114049", "id3","key3")
 )).toDF("date","accountid", "key")
val gp=df.groupBy("accountid","date").agg(collect_list("key"))

    +---------+--------------+-----------------+
|accountid|          date|collect_list(key)|
+---------+--------------+-----------------+
|      id2|20180610114049|           [key2]|
|      id1|20180610114049|     [key1, key1]|
|      id3|20180613114049|     [key2, key3]|
|      id2|20180612114049|           [key1]|
+---------+--------------+-----------------+


val df2= sc.parallelize(Seq(("20180610114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180610114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180611114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180612114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180613114049", "id3","{'key1':'0.0','key2':'0.0','key3':'0.0'}")
 )).toDF("date","accountid", "result")

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
+--------------+---------+----------------------------------------+

预期产出

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'1.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'1.0','key3':'1.0'}|
+--------------+---------+----------------------------------------+

共有2个答案

堵彬彬
2023-03-14

加入两个数据帧后,您可以使用udf函数来实现您的需求。当然也有一些事情,比如将json转换为struct、再次将struct转换为json、case类用法等等(提供注释以作进一步解释)

import org.apache.spark.sql.functions._

//aliasing the collected key
val gp = df.groupBy("accountid","date").agg(collect_list("key").as("keys"))

//schema for converting json to struct
val schema = StructType(Seq(StructField("key1", StringType, true), StructField("key2", StringType, true), StructField("key3", StringType, true)))

//udf function to update the values of struct where result is a case class
def updateKeysUdf = udf((arr: Seq[String], json: Row) => Seq(json.schema.fieldNames.map(key => if(arr.contains(key)) "1.0" else json.getAs[String](key))).collect{case Array(a,b,c) => result(a,b,c)}.toList(0))

//changing json string to stuct using the above schema
df2.withColumn("result", from_json(col("result"), schema))
  .as("df2")   //aliasing df2 for joining and selecting
    .join(gp.as("gp"), col("df2.accountid") === col("gp.accountid"), "left")   //aliasing gp dataframe and joining with accountid
    .select(col("df2.accountid"), col("df2.date"), to_json(updateKeysUdf(col("gp.keys"), col("df2.result"))).as("result"))  //selecting and calling above udf function and finally converting to json stirng
  .show(false)

其中结果是一个case类

case class result(key1: String, key2: String, key3: String)

这应该给你

+---------+--------------+----------------------------------------+
|accountid|date          |result                                  |
+---------+--------------+----------------------------------------+
|id3      |20180613114049|{"key1":"0.0","key2":"1.0","key3":"1.0"}|
|id1      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id1      |20180611114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id2      |20180610114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
|id2      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id2      |20180612114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
|id2      |20180612114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
+---------+--------------+----------------------------------------+

我希望答案是有帮助的

须曜文
2023-03-14

你肯定需要一个UDF来干净地做这件事。

您可以在dateaccountid加入后将数组和JSON传递给UDF,使用您选择的解析器解析UDF中的JSON(我在示例中使用JSON4S),检查数组中是否存在键,然后更改值,再次将其转换为JSON并从UDF返回。

val gp=df.groupBy("accountid","date").agg(collect_list("key").as("key"))

val joined = df2.join(gp, Seq("date", "accountid") , "left_outer")

joined.show(false)
//+--------------+---------+----------------------------------------+------------+
//|date          |accountid|result                                  |key         |
//+--------------+---------+----------------------------------------+------------+
//|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2]      |
//|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2, key3]|
//|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1, key1]|
//|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|null        |
//|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1]      |
//+--------------+---------+----------------------------------------+------------+

// the UDF that will do the most work
// it's important to declare `formats` inside the function
// to avoid object not Serializable exception
// Not all cases are covered, use with caution :D
val convertJsonValues = udf{(json: String, arr: Seq[String]) =>
    import org.json4s.jackson.JsonMethods._
    import org.json4s.JsonDSL._
    implicit val format = org.json4s.DefaultFormats
    // replace single quotes with double
    val kvMap = parse(json.replaceAll("'", """"""")).values.asInstanceOf[Map[String,String]]
    val updatedKV = kvMap.map{ case(k,v) => if(arr.contains(k)) (k,"1.0") else (k,v) }
    compact(render(updatedKV))
}

// Use when-otherwise and send empty array where `key` is null
joined.select($"date", 
              $"accountid",
              when($"key".isNull, convertJsonValues($"result", array()))
               .otherwise(convertJsonValues($"result", $"key"))
               .as("result")
              ).show(false)

//+--------------+---------+----------------------------------------+
//|date          |accountid|result                                  |
//+--------------+---------+----------------------------------------+
//|20180610114049|id2      |{"key1":"0.0","key2":"1.0","key3":"0.0"}|
//|20180613114049|id3      |{"key1":"0.0","key2":"1.0","key3":"1.0"}|
//|20180610114049|id1      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
//|20180611114049|id1      |{"key1":"0.0","key2":"0.0","key3":"0.0"}|
//|20180612114049|id2      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
//+--------------+---------+----------------------------------------+
 类似资料:
  • 我有两个熊猫数据框 步骤1:根据df1中唯一的“val”在df2中创建列,如下所示: 步骤2:对于flag=1的行,AA_new将计算为var1(来自df2)*组“A”和val“AA”的df1的'cal1'值*组“A”和val“AA”的df1的'cal2'值,类似地,AB_new将计算为var1(来自df2)*组“A”和val“AB”的df1的'cal1'值*组“A”和val“AB”的df1的'c

  • 我有两个data.frames,每个都有数千行和几十列,都是通过合并几个csv文件创建的。data.frames正是我想要的。我还要补充一点,df1和df2有几列是共同的。唯一的问题是,在其中一个中,比如df1,对于某些列,有一些NAs(这是预期的/正常的)。好的一面是,我有NAs的相同列也出现在第二data.frame,比如df2,但没有NAs。我想做的是用df1同一列的值填充df2给定列中的N

  • 我正在尝试基于第二个数据框的值周围的范围创建一个数据框的子集,我一直在进行研究,但我就是想不出如何去做。我在这里使用了虚拟数据,因为它们都是包含许多列的大型数据集。 数据帧1(df1)有50列,数千条不同纬度的记录 数据帧2(df2)有数百个城镇,都位于不同纬度,比df1小得多 我需要df1的一个子集,它只包括纬度在df2纬度0.01范围内的行。所以代码需要查看df1的每一行,并根据df2的每一行

  • 我有两个共享分组ID的数据帧。我希望根据另一个设置的条件在其中创建一个日期序列。MRE如下: 在这个例子中,我想在 中创建一列日期,从 中的下一个日期开始(每个组 - 组中的 在 中, 在 中)。 具有数据。表中,这些(可怕的)方法都不起作用: 我一直在尝试数据。表方法,因为它们被认为更快(而且实际数据相当大),但实际上,任何(合理的)方法都可以。 我的预期结果是一个看起来像这样的 data.fr

  • 在Jquery中,我只想在我的第二个datalist上显示正确链接到第一个datalist上所做选择的某些属性的结果 我尝试了很多很多事情,但我无法对第一个datalist的选定项的属性进行排序 谢谢!!