当前位置: 首页 > 知识库问答 >
问题:

Spark中用户定义的聚合函数以实现百分位数

桓宜
2023-03-14

我试图编写< code>udaf来计算< code>percentile值。

我需要编写自定义函数,因为现有的火花函数percentile_approxapprox_percentile百分位使用舍入不同于我的需要。

我需要使用地板而不是中点舍入。我可以用<code>pyspark<code>编写它吗?

如果不是,如何在scala中实现这一点?

我需要使用以下方法计算百分位数

def percentile_custom(lst, per):
    lst.sorted()
    rank = (len(lst)+1)*per
    ir = math.floor(rank)
    ir1 = math.ceil(rank)
    if (ir == ir1):
        return lst[ir-1]

    else: 
        fr = rank - ir
        ir_qh = lst[ir-1]
        ir_qh1 = lst[ir]
        inter = ((ir_qh1 - ir_qh)*fr) + ir_qh
        return math.floor(inter) 

共有1个答案

乐山
2023-03-14

以下是我在pyspark中编写的相同函数,请告诉我,以防万一它不适合您:

from pyspark.sql import Window
import math
import pyspark.sql.types as T
import pyspark.sql.functions as F

def calc_percentile(perc_df, part_col, order_col, p_val=[33,66], num_bins=100, max_bins = 100, perc_col="p_band"):
    """
        Calculate percentile with nimber of bins on specified columns
    """
    win = Window.partitionBy(*part_col).orderBy(order_col)
    def perc_func(col, num, max_bins):
        step = max_bins / num
        return {(p_tile / step): int(
            math.ceil(col * (p_tile / float(max_bins)))
        )  for p_tile in range(step, max_bins + step, step)}
    perc_udf = F.udf(perc_func, T.MapType(T.IntegerType(), T.IntegerType()))
#     perc_df.show()
    rank_data = perc_df.filter(
        F.col(order_col).isNotNull()
    ).withColumn(
        "rank", F.dense_rank().over(win)
    )

    rank_data.persist()
    rank_data.count()

    overall_count_data = rank_data.groupBy(
        *part_col
    ).agg(
        F.max(
            F.col("rank")
        ).alias("count")
    ).select(
        F.explode(
            perc_udf(F.col("count"), F.lit(num_bins), F.lit(max_bins))
        ).alias("n_tile", "rank"), "count",
        *part_col
    )
    overall_count_data.persist()
    overall_count_data.count()
    return overall_count_data.join(
        rank_data, part_col + ["rank"]
    ).withColumn(
        perc_col,
        F.concat(F.lit("P_"), F.col("n_tile").cast("string"))
    ).groupBy(
        *part_col
    ).pivot(
        perc_col, ["P_{0}".format(p_val1) for p_val1 in p_val]
    ).agg(
        F.max(order_col)
    ).select(
        *(
            part_col + [F.col("P_{0}".format(p_val1)) for p_val1 in p_val]
        )
    )
 类似资料:
  • 我知道如何在SparkSQL中编写UDF: 我可以做类似的事情来定义聚合函数吗?这是怎么做到的? 对于上下文,我想运行以下SQL查询: 它应该会返回类似于 我希望聚合函数告诉我,在由< code>span和< code>timestamp定义的组中,是否有任何< code>opticalReceivePower的值低于阈值。我需要把我的UDAF写得和我上面粘贴的UDF不同吗?

  • 我正在测试Cassandra中的UDF/UDA特性,看起来不错。但我在使用它时没有什么问题。 1) 在卡桑德拉。yaml,有人提到启用沙箱是为了避免邪恶代码,那么我们是否违反了规则,启用此支持(标志)会产生什么后果? 2)与在客户端读取数据和编写聚合逻辑相比,在Cassandra中使用UDF/UDA有什么优势? 3)此外,除了JAVA之外,是否有一种语言支持可用于编写UDF/UDA的nodejs、

  • 我想在UDAF中传递一个数组作为输入模式。 我给出的例子非常简单,它只是对2个向量求和。实际上我的用例更复杂,我需要使用UDAF。 在“显示”动作之前,所有这些都可以很好地进行转换。但这部剧引发了一个错误: 斯卡拉。MatchError:[WrappedArray(21.4,24.9,22.0)](属于org.apache.spark.sql.execution.aggregate.InputAg

  • 我需要聚合一个基于1分钟时间间隔的数据集。当我尝试此操作时,它会抛出错误: 我的数据集如下所示 org.apache.spark.sql.AnalysisException:无法解析(datetime,value)中的列名“60秒”;在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在org.apach

  • 问题内容: 我正在尝试在Oracle中编写一个自定义聚合函数,并将该函数与其他一些函数一起分组在一个包中。作为一个示例(为了模拟我遇到的问题),假设我的自定义聚合对数字进行求和看起来像: 如果我编写以下函数定义: 和相应的类型声明进行测试: 这个说法: 给出正确的结果70。但是,使用函数定义创建一个包: 并通过以下方式调用: 与爆炸 是否可以在包声明中嵌套自定义聚合函数? 问题答案: Oracle

  • 我有一个数据帧如下,我试图得到最大(总和)的用户组名称。 下面是我用来为用户获取最大值(总和)的自定义项 当我运行udf时,它抛出了下面的错误 错误:无法执行用户定义的函数($anonfun$1:(数组)= 因为我需要在更大的数据集中实现这一点,所以如何以更好的方式实现这一点 预期的结果是