我试图在pySpark的一行代码中进行多项操作,但不确定我的情况是否可行。
我的意图是不必将输出另存为新的数据框。
我当前的代码非常简单:
encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
.groupBy('timePeriod')
.agg(
mean('DOWNSTREAM_SIZE').alias("Mean"),
stddev('DOWNSTREAM_SIZE').alias("Stddev")
)
.show(20, False)
我的意图是count()
在使用后添加groupBy
,以得到与每个 timePeriod
列值匹配的记录计数,这些记录打印\显示为输出。
尝试使用时,出现groupBy(..).count().agg(..)
异常。
有什么方法可以同时实现count()
和agg()
.show()打印,而无需将代码拆分为两行命令,例如:
new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
或更好的办法是将合并的输出输出到agg.show()
-额外的一栏,用于说明与该行的值匹配的已记录记录数。例如:
timePeriod | Mean | Stddev | Num Of Records
X | 10 | 20 | 315
count()
可以在内部使用,agg()
因为groupBy
表达式相同。
import pyspark.sql.functions as func
new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"]))
.groupBy("timePeriod")
.agg(
func.mean("DOWNSTREAM_SIZE").alias("Mean"),
func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
func.count(func.lit(1)).alias("Num Of Records")
)
.show(20, False)
pySpark
SQL函数文档
import org.apache.spark.sql.functions._ //for count()
new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))
.groupBy("timePeriod")
.agg(
mean("DOWNSTREAM_SIZE").alias("Mean"),
stddev("DOWNSTREAM_SIZE").alias("Stddev"),
count(lit(1)).alias("Num Of Records")
)
.show(20, false)
count(1)
将按等于 count("timePeriod")
import static org.apache.spark.sql.functions.*;
new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))
.groupBy("timePeriod")
.agg(
mean("DOWNSTREAM_SIZE").alias("Mean"),
stddev("DOWNSTREAM_SIZE").alias("Stddev"),
count(lit(1)).alias("Num Of Records")
)
.show(20, false)
问题内容: 我正在寻找有关通过python中的spark可用的聚合功能的更好解释。 我的示例如下(使用来自Spark 1.2.0版本的pyspark) 输出: 我得到的预期结果是和4个元素的总和。如果我将传递给聚合函数的初始值更改为from, 则会得到以下结果 输出: 该值增加9。如果将其更改为,则该值为,依此类推。 有人可以向我解释该值是如何计算的吗?我希望该值增加1而不是9,希望看到相反的值。
我试图在火花数据帧中使用rowNumber。我的查询在Spark shell中按预期工作。但是当我在eclipse中写出它们并编译一个jar时,我面临着一个错误 我的问题 在Spark shell中运行查询时,我没有使用HiveContext。不确定为什么它返回一个错误,当我运行相同的jar文件。如果有帮助的话,我也在Spark 1.6.0上运行脚本。有人面临类似的问题吗?
我需要汇总以下记录中的所有标记: https://gist.github.com/sbassi/5642925 (这个片段中有2个样本记录)并按大小对它们进行排序(首先是出现频率更高的标记)。但是我不想考虑具有特定“user_id”的数据(比方说,2,3,6和12)。 以下是我的尝试(只是聚合,没有过滤和排序): db。用户库。聚合({$unwind:“$annotations.data.tags
一、简单聚合 1.1 数据准备 // 需要导入 spark sql 内置的函数包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.jso
代码审查问题:带有Spark DataFrame的通用“还原按”或“分组按聚合”功能 好的,各位。也许我彻底改造了这里的轮子,或者也许我发明了一些有用的东西。你们谁能告诉我有没有更好的方法?以下是我想做的: 我想要一个通用的简化函数,它的工作原理类似于RDD的简化ByKey,但允许我使用Spark DataFrame中的任何列。你可能会说我们已经有了,它被称为groupBy,但据我所知,group
我有mongodb聚合查询,它在shell中工作得很好。如何重写此查询以便与morphia一起使用? 只接受一个字段名,但我需要向该集合添加对象。