当前位置: 首页 > 知识库问答 >
问题:

在基于列值进行分区后,将Bucketizer应用于Spark数据帧

包兴思
2023-03-14

我需要在下面的数据框df上应用火花划水器。这是模型数据。原始数据帧大约有10k记录。

 instance   name                 value    percentage
 A37        Histogram.ratio      1            0.20
 A37        Histogram.ratio      20           0.34           
 A37        Histogram.ratio      50           0.04           
 A37        Histogram.ratio      500          0.13           
 A37        Histogram.ratio      2000         0.05           
 A37        Histogram.ratio      9000         0.32           
 A49        Histogram.ratio      1            0.50
 A49        Histogram.ratio      20           0.24           
 A49        Histogram.ratio      25           0.09           
 A49        Histogram.ratio      55           0.12           
 A49        Histogram.ratio      120          0.06           
 A49        Histogram.ratio      300          0.08

在按列实例对数据帧进行分区之后,我需要应用bucketizer。实例中的每个值都有下面定义的不同分割数组

val splits_map =  Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))

我将使用下面的代码在单列上执行bucketing。但是需要通过实例列对数据帧进行分区,然后应用bucketizer。使改变

val bucketizer = new Bucketizer().setInputCol("value").setOutputCol("value_range").setSplits(splits)
val df2 = bucketizer.transform(df)

df2.groupBy("value_range").sum("percentage").show()

是否可以使用列值实例将数据帧拆分为多个数据帧,然后对列进行压缩,然后使用groupBy()。sum()计算百分比之和。

预期产出:

instance   name                 bucket    percentage
A37        Histogram.ratio      0            0.54                
A37        Histogram.ratio      1            0.17           
A37        Histogram.ratio      3            0.05           
A37        Histogram.ratio      4            0.32           
A49        Histogram.ratio      0            0.50
A49        Histogram.ratio      1            0.33                     
A49        Histogram.ratio      2            0.12           
A49        Histogram.ratio      3            0.14   

共有1个答案

燕博文
2023-03-14

分区内数据备份的另一种方法是:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

def bucketizeWithinPartition(df: DataFrame, splits: Map[String, Array[Int]], partitionCol: String, featureCol: String): DataFrame = {
  val window = Window.partitionBy(partitionCol).orderBy($"bucket_start")

  val splitsDf = splits.toList.toDF(partitionCol, "splits")
    .withColumn("bucket_start", explode($"splits"))
    .withColumn("bucket_end", coalesce(lead($"bucket_start", 1).over(window), lit(Int.MaxValue)))
    .withColumn("bucket", row_number().over(window))

  val joinCond = "d.%s = s.%s AND d.%s >= s.bucket_start AND d.%s < bucket_end".format(partitionCol, partitionCol, featureCol, featureCol)
  df.as("d")
    .join(splitsDf.as("s"), expr(joinCond), "inner")
    .select($"d.*", $"s.bucket")
}


val data =
  List(
    ("A37", "Histogram.ratio", 1, 0.20),
    ("A37", "Histogram.ratio", 20, 0.34),
    ("A37", "Histogram.ratio", 9000, 0.32),
    ("A49", "Histogram.ratio", 1, 0.50),
    ("A49", "Histogram.ratio", 20, 0.24)
  ).toDF("instance", "name", "value", "percentage")

val splits_map =  Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
val bucketedData = bucketizeWithinPartition(data, splits_map, "instance", "value")
 类似资料:
  • 我有一个像下面这样的DataFrame,标识符作为现有DateIndex顶部的列。 我的目标是为除id之外的每一列(a和B)创建一个新的子DataFrames,其中dateIndex作为单个索引,id(foo,bar)作为列名。预期产出如下所示:

  • 我有一个名为“segments”的数据帧,看起来像这样: 我想让它看起来像这样。其中Outdegree是包含列A中的值的行数,例如10135在3行中,因此每行Outdegree设置为3: 以下是我正在尝试但不起作用的内容: 该代码将所有行相加,并将总计值提供给OUTDEGREE列

  • null 火花:1.3 卡桑德拉:2.1 连接器:1.3.1 火花节点(5)和CASS*集群节点(4)运行在不同的数据中心 代码提取。请使用以上链接下载代码以获取更多详细信息 步骤1:将数据加载到8个spark分区中 加载Rdd值

  • 我正在处理一些数据,并希望将某个列的最大值按不同的列分组。但是,我想根据另一列从最大计算中排除某些行。 示例: 我想得到Col3的最大值,按Col1分组,同时排除Col2中包含“Other”的任何行。因此,“A”的Col3的最大值应该是5,而不是17。 我能够使用:但是,对于a,这将给我一个17的值。 通过查看其他线程,我尝试使用: 这似乎让我接近了(它将数据按Col1分组,并根据Col2删除了行

  • 我想根据某个列变量的不同值从数据框中选择行,并制作直方图。 输出:空数据框列:[年龄、工人阶级、fnlwgt、教育程度、受教育人数、婚姻状况、职业、关系、种族、性别、capitalgain、CapitalAlloss、每周小时数、国家、收入水平]索引:[] 从上面的几行可以看出,我试图选择收入水平为'

  • 在执行时,是否有一种方法可以将聚合函数应用于dataframe的所有列(或列表)?换句话说,是否有一种方法可以避免对每个列都这样做: