当前位置: 首页 > 知识库问答 >
问题:

在spark scala中为数据帧中的每个组采样不同数量的随机行

琴俊良
2023-03-14

目标是为每个组在数据帧中采样(不替换)不同数量的行。特定组要采样的行数在另一个数据帧中。

示例:idDF是要从中采样的数据帧。这些组由ID列表示。数据框planDF指定每个组要采样的行数,其中“datesToUse”表示行数,“ID”表示组。“totalDates”是该组的总行数,可能有用,也可能无用。

最终结果应包括从第一组(ID 1)中取样的3行、从第二组(ID 2)中取样的2行和从第三组(ID 3)中取样的1行。

val idDF = Seq(
  (1, "2017-10-03"),
  (1, "2017-10-22"),
  (1, "2017-11-01"),
  (1, "2017-10-02"),
  (1, "2017-10-09"),
  (1, "2017-12-24"),
  (1, "2017-10-20"),
  (2, "2017-11-17"),
  (2, "2017-11-12"),
  (2, "2017-12-02"),      
  (2, "2017-10-03"),
  (3, "2017-12-18"),
  (3, "2017-11-21"),
  (3, "2017-12-13"),
  (3, "2017-10-08"),
  (3, "2017-10-16"),
  (3, "2017-12-04")
 ).toDF("ID", "date")

val planDF = Seq(
  (1, 3, 7),
  (2, 2, 4),
  (3, 1, 6)
 ).toDF("ID", "datesToUse", "totalDates")

这是一个示例,说明了生成的数据帧应该是什么样子:

+---+----------+
| ID|      date|
+---+----------+
|  1|2017-10-22|
|  1|2017-11-01|
|  1|2017-10-20|
|  2|2017-11-12|
|  2|2017-10-03|
|  3|2017-10-16|
+---+----------+

到目前为止,我尝试使用DataFrame的示例方法:https://spark.apache.org/docs/1.5.0/api/java/org/apache/spark/sql/DataFrame.html下面是一个适用于整个数据帧的示例。

def sampleDF(DF: DataFrame, datesToUse: Int, totalDates: Int): DataFrame = {
  val fraction = datesToUse/totalDates.toFloat.toDouble
  DF.sample(false, fraction)
}

我不知道如何在每个小组中使用这样的东西。我尝试将planDF表连接到idDF表,并使用窗口分区。

我的另一个想法是以某种方式创建一个随机标记为True/false的新列,然后在该列上进行筛选。

共有2个答案

干鑫鹏
2023-03-14

假设您的planDF足够小,可以collected,您可以使用Scala的foldLeft遍历id列表,并根据id累积样本数据帧:

import org.apache.spark.sql.{Row, DataFrame}

def sampleByIdDF(DF: DataFrame, id: Int, datesToUse: Int, totalDates: Int): DataFrame = {
  val fraction = datesToUse.toDouble / totalDates
  DF.where($"id" === id ).sample(false, fraction)
}

val emptyDF = Seq.empty[(Int, String)].toDF("ID", "date")

val planList = planDF.rdd.collect.map{ case Row(x: Int, y: Int, z: Int) => (x, y, z) }
// planList: Array[(Int, Int, Int)] = Array((1,3,7), (2,2,4), (3,1,6))

planList.foldLeft( emptyDF ){
  case (accDF: DataFrame, (id: Int, num: Int, total: Int)) =>
    accDF union sampleByIdDF(idDF, id, num, total)
}
// res1: org.apache.spark.sql.DataFrame = [ID: int, date: string]

// res1.show
// +---+----------+
// | ID|      date|
// +---+----------+
// |  1|2017-10-03|
// |  1|2017-11-01|
// |  1|2017-10-02|
// |  1|2017-12-24|
// |  1|2017-10-20|
// |  2|2017-11-17|
// |  2|2017-11-12|
// |  2|2017-12-02|
// |  3|2017-11-21|
// |  3|2017-12-13|
// +---+----------+

请注意,方法sample()不一定会生成方法参数中指定的确切样本数。这是一个相关的问题

如果您的PLANDF很大,您可能需要考虑使用RDD的U、COMPOP:(u,u)=u(隐式证据30美元:Scala。IsCurras.CasStasg [U]):u“Re=“NoFoLoLoeFror”>聚合,它具有以下签名(跳过隐式参数):

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

它的工作原理有点像foldLeft,不同的是它在一个分区内有一个累加运算符,另外还有一个累加运算符来处理来自不同分区的结果。

裴昕
2023-03-14

另一个完全停留在数据帧中的选项是使用planDF计算概率,加入idDF,附加一列随机数,然后过滤。有用的是,sql。函数有一个rand函数。

import org.apache.spark.sql.functions._

import spark.implicits._

val probabilities = planDF.withColumn("prob", $"datesToUse" / $"totalDates")

val dfWithProbs = idDF.join(probabilities, Seq("ID"))
  .withColumn("rand", rand())
  .where($"rand" < $"prob")

(您需要再次检查这不是整数除法。)

 类似资料:
  • 问题内容: 在不从javascript中的数组进行替换的情况下,获取随机样本的一种干净方法是什么?所以假设有一个数组 我想随机采样5个唯一值;例如,生成一个长度为5的随机子集。要生成一个随机样本,可以执行以下操作: 但是,如果多次执行此操作,则可能会多次捕获同一项。 问题答案: 我建议使用Fisher-Yates混洗混洗数组的副本并进行切片: 请注意,这不是获取大型数组的随机小子集的最有效方法,因

  • 这是一个有点复杂的解释,所以我希望它足够清楚,但如果不是,我会尝试和扩展更多。 所以我有一个这样的数据帧:

  • 我显示这样的数组: FieldListItem呈现方法是: 基本上,我使用切片只显示前3个元素,我想以不同的方式显示它们。例如,我想将第一个元素样式为标题-粗体,第二个元素为单行,字体应该更小,最后一个元素可以是多行,颜色应该不同。我有什么办法可以做到吗?

  • 我有一个包含40百万行的数据框。有一个名为的列指定一行的组标识符。一共有2000个群。 我想随机标记每组中的元素,并将此信息添加到的列中。例如,如果组1包含行1、2、3、4和5,那么我选择(1、2、3、4、5)的排列,例如,我们取(5、3、4、2、1)。然后我将值[5,3,4,2,1]分配给这些行的列。 我定义了一个函数,并使用了并行化,但是速度非常慢。你能建议一个更快的方法吗?

  • 在一个线程组中,有多个http请求采样器。我需要发送一个号码与每一个这些请求。对于每一个请求,这个数字应该是不同的。当我使用Jmeter随机变量为每个请求每次获得一个随机数时。我想的是,在一个线程中,当我在n个请求中调用这个变量n次时,它每次都给出一个不同的数字,然而它在每次调用中都给出相同的数字。

  • 我被分配了一项编程任务,但我被卡住了。其说明如下: 有一个名为“秘密圣诞老人”(给他们礼物)的游戏,有很多孩子参加。对于每个参与的孩子,都有一个来自参与孩子的秘密圣诞朋友。我必须编写一个程序,为每个参与的孩子挑选一个秘密的圣诞老人朋友。 示例:如果Bob,Alice,John和George是参与的孩子,在随机选择之后, 输出可能看起来像 具有相同输入的连续两次程序运行不应有相同的结果。 我的想法是