当前位置: 首页 > 知识库问答 >
问题:

Spark DataFrame/Dataset为每个键查找最常见的值高效的方式

龚征
2023-03-14

问题:我有一个问题,以映射最常见的一个键的火花值(使用scala)。我已经用RDD做了,但不知道如何有效地使用DF/DS(Sparksql)

数据集就像

key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a

经过spark转换和访问输出后,每个键都应具有其公共值

输出

key1 = valueb
key2 = valuec
key3 = valuea

尝试到现在:

RDD

我曾尝试在RDD中按(键、值)组进行映射和减少,计数,它会产生逻辑,但我无法将其转换为sparksql(数据帧/数据集)(因为我希望在网络中进行最小的无序移动)

这是我的RDD代码

 val data = List(

"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"

)

val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)

val lineRDD = sc.parallelize(data)

val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}

val flatPairsRDD = pairedRDD.flatMap {
  (key, val) => ((key, val), 1)
}

val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)




val resultsRDD = SumRDD.map{
  case ((key, val), count) => (key, (val,count))
 }.groupByKey.map{
  case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}

resultsRDD.collect().foreach(println)

DataFrame,使用窗口:我正在尝试使用窗口。partitionBy(“key”、“value”)来聚合窗口中的计数。和thn排序agg()


共有2个答案

葛鸿熙
2023-03-14

使用groupBy怎么样?

val maxFreq= udf((values: List[Int]) => {
  values.groupBy(identity).mapValues(_.size).maxBy(_._2)._1
})

df.groupBy("key")
  .agg(collect_list("value") as "valueList")
  .withColumn("mostFrequentValue", maxFreq(col("valueList")))
刘畅
2023-03-14

根据我对你问题的理解,这是你能做的

首先,您必须读取数据并将其转换为dataframe

val df = sc.textFile("path to the data file")   //reading file line by line
  .map(line => line.split("="))                 // splitting each line by =
  .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created
  .toDF("key", "value")                        //rdd converted to dataframe which required import sqlContext.implicits._

哪一个呢

+----+-------+
|key |value  |
+----+-------+
|key1|value_a|
|key1|value_b|
|key1|value_b|
|key2|value_a|
|key2|value_c|
|key2|value_c|
|key3|value_a|
+----+-------+

下一步是计算每个键相同值的重复次数,并选择每个键重复次数最多的值,可以使用窗口功能和聚合,如下所示

import org.apache.spark.sql.expressions._                   //import Window library
def windowSpec = Window.partitionBy("key", "value")         //defining a window frame for the aggregation
import org.apache.spark.sql.functions._                     //importing inbuilt functions
df.withColumn("count", count("value").over(windowSpec))     // counting repeatition of value for each group of key, value and assigning that value to new column called as count
  .orderBy($"count".desc)                                   // order dataframe with count in descending order
  .groupBy("key")                                           // group by key
  .agg(first("value").as("value"))                          //taking the first row of each key with count column as the highest

因此,最终输出应等于

+----+-------+
|key |value  |
+----+-------+
|key3|value_a|
|key1|value_b|
|key2|value_c|
+----+-------+ 
 类似资料:
  • 问题内容: 假设我有一个具有属性X的表A,如何找到出现次数最多的X?(可以有多个出现次数最高的事件) 即表A 我想回来 我不能在Sqlite中使用关键字ALL,所以我很茫然。 我想到了获取每个X的计数,然后对其进行排序,然后以某种方式使用ORDER BY DESC,以使最大数位于顶部,然后与LIMIT进行比较,以检查第一个元组以下的值是否相等(这意味着它们只是一样),但我不确定LIMIT语法以及是

  • 问题内容: 编辑:我正在使用MySQL,我发现了另一个具有相同问题的帖子,但是它在Postgres中。我需要MySQL。 获取SQL中另一列的每个值的最通用值 在广泛搜索本网站和其他网站之后,我提出了这个问题,但没有找到符合我预期目的的结果。 我有一个人表(recordid,personid,transactionid)和一个事务表(transactionid,rating)。我需要一条SQL语句

  • 有人能提出一种有效的方法,在一列中为另一列中的每个唯一值获取最高值吗 np。数组如下所示[column0,column1,column2,column3] 其中我想根据第3列的唯一值返回第1列的最高值。之后的新数组应该是这样的: 我知道如何通过循环来做到这一点,但这不是我所关心的,因为我工作的桌子很大,我想避免循环

  • 我试图在许多字符串上找到一个常见的(最常见的,甚至是“平均”)短语。不同字符串之间的结构非常糟糕,并且充满了不一致性。许多字符串都添加了与所需输出无关的唯一位:一个新字符串,其作用类似于字符串集的某种摘要。 为了说明这一点,我提供了一个小示例,实际数据更复杂,由更多字符串组成: 示例数据: 在一个更小的数据集上,结构要复杂得多,我以前一直依赖字数: 在这个更简单的数据集上,只需选取n个最常见的短语

  • 问题内容: 我将如何查找数组中三个最常见的元素?我正在使用长度为10,000的数组,元素为0-100之间的随机整数。 我正在考虑使用两个数组,其中一个长度为100,并且仅通过使用if语句来递增。但是,我想知道是否有一种方法只能使用一个for / if loop(statement)来查找这些值。 问题答案: 如果要通过列表中的固定次数进行此操作,则需要第二个数据结构。 如果该集合中的值有上限和下限