当前位置: 首页 > 面试题库 >

聚合功能在Spark中使用groupBy计数使用情况

斜宁
2023-03-14
问题内容

我试图在pySpark的一行代码中进行多项操作,但不确定我的情况是否可行。

我的意图是不必将输出另存为新的数据框。

我当前的代码非常简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

我的意图是count()在使用后添加groupBy,以得到与每个 timePeriod 列值匹配的记录计数,这些记录打印\显示为输出。

尝试使用时,出现groupBy(..).count().agg(..)异常。

有什么方法可以同时实现count()agg() .show()打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

或更好的办法是将合并的输出输出到agg.show()-额外的一栏,用于说明与该行的值匹配的已记录记录数。例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

问题答案:

count()可以在内部使用,agg()因为groupBy表达式相同。

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark
SQL函数文档

与斯卡拉

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) 将按等于 count("timePeriod")

使用Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)


 类似资料:
  • 问题内容: 我正在寻找有关通过python中的spark可用的聚合功能的更好解释。 我的示例如下(使用来自Spark 1.2.0版本的pyspark) 输出: 我得到的预期结果是和4个元素的总和。如果我将传递给聚合函数的初始值更改为from, 则会得到以下结果 输出: 该值增加9。如果将其更改为,则该值为,依此类推。 有人可以向我解释该值是如何计算的吗?我希望该值增加1而不是9,希望看到相反的值。

  • 我试图在火花数据帧中使用rowNumber。我的查询在Spark shell中按预期工作。但是当我在eclipse中写出它们并编译一个jar时,我面临着一个错误 我的问题 在Spark shell中运行查询时,我没有使用HiveContext。不确定为什么它返回一个错误,当我运行相同的jar文件。如果有帮助的话,我也在Spark 1.6.0上运行脚本。有人面临类似的问题吗?

  • 我需要汇总以下记录中的所有标记: https://gist.github.com/sbassi/5642925 (这个片段中有2个样本记录)并按大小对它们进行排序(首先是出现频率更高的标记)。但是我不想考虑具有特定“user_id”的数据(比方说,2,3,6和12)。 以下是我的尝试(只是聚合,没有过滤和排序): db。用户库。聚合({$unwind:“$annotations.data.tags

  • 一、简单聚合 1.1 数据准备 // 需要导入 spark sql 内置的函数包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.jso

  • 代码审查问题:带有Spark DataFrame的通用“还原按”或“分组按聚合”功能 好的,各位。也许我彻底改造了这里的轮子,或者也许我发明了一些有用的东西。你们谁能告诉我有没有更好的方法?以下是我想做的: 我想要一个通用的简化函数,它的工作原理类似于RDD的简化ByKey,但允许我使用Spark DataFrame中的任何列。你可能会说我们已经有了,它被称为groupBy,但据我所知,group

  • 我有mongodb聚合查询,它在shell中工作得很好。如何重写此查询以便与morphia一起使用? 只接受一个字段名,但我需要向该集合添加对象。