+------------+------+------+--------+
| population | rate | year | city |
+------------+------+------+--------+
| 100 | 0.1 | 1 | one |
+------------+------+------+--------+
| 100 | 0.11 | 2 | one |
+------------+------+------+--------+
| 100 | 0.12 | 3 | one |
+------------+------+------+--------+
| 200 | 0.1 | 1 | two |
+------------+------+------+--------+
| 1000 | 0.21 | 2 | three |
+------------+------+------+--------+
| 1000 | 0.22 | 3 | three |
+------------+------+------+--------+
就我所理解的你的描述,你所需要的只是一些基本的代数和窗口函数。首先让我们重新创建示例数据:
import pandas as pd # Just to make a reproducible example
pdf = pd.DataFrame({
'city': {0: 'one', 1: 'one', 2: 'one', 3: 'two', 4: 'three', 5: 'three'},
'population': {0: 100, 1: 100, 2: 100, 3: 200, 4: 1000, 5: 1000},
'rate': {0: 0.10000000000000001,
1: 0.11,
2: 0.12,
3: 0.10000000000000001,
4: 0.20999999999999999,
5: 0.22},
'year': {0: 1, 1: 2, 2: 3, 3: 1, 4: 2, 5: 3}})
df = sqlContext.createDataFrame(pdf)
df.show()
## +-----+----------+----+----+
## | city|population|rate|year|
## +-----+----------+----+----+
## | one| 100| 0.1| 1|
## | one| 100|0.11| 2|
## | one| 100|0.12| 3|
## | two| 200| 0.1| 1|
## |three| 1000|0.21| 2|
## |three| 1000|0.22| 3|
## +-----+----------+----+----+
接下来我们定义窗口:
import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import exp, log, sum, first, col, coalesce
# Base window
w = Window.partitionBy("city").orderBy("year")
# ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
wr = w.rowsBetween(-sys.maxsize, -1)
和一些列:
# Take a sum of logarithms of rates over the window
log_sum = sum(log(col("rate") + 1)).over(wr)
# Take sum of logs and exponentiate to go back to original space
cumulative_rate = exp(log_sum).alias("cumulative_rate")
# Find base population for each group
base_population = first("population").over(w).alias("base_population")
# Prepare final column (base population * cumulative product of rates)
current_population = coalesce(
# This is null for the first observation in a group
cumulative_rate * base_population,
# so we provide population as an alternative
col("population")
).alias("current_population")
df.select("*", current_population).show()
## +-----+----------+----+----+------------------+
## | city|population|rate|year|current_population|
## +-----+----------+----+----+------------------+
## |three| 1000|0.21| 2| 1000.0|
## |three| 1000|0.22| 3| 1210.0|
## | two| 200| 0.1| 1| 200.0|
## | one| 100| 0.1| 1| 100.0|
## | one| 100|0.11| 2|110.00000000000001|
## | one| 100|0.12| 3|122.10000000000004|
## +-----+----------+----+----+------------------+
我知道这不是一个人应该在这里做的事情,尽管我不知道否则我怎么能问这个问题。 我的目标是统计有多少行具有相同的第一预测值(购买、维护等),但评级不同。 我的尝试是通过第一列来计算,然后计算我得到的不同评分,这将是非常棒的。 一个重要的假设是,如果两行相同,则它们已被预先擦除。因此,可以找到不同等级的行,但没有相同的行。 在我的示例中,第1行和第3行是相同的,因此其中一行被擦除,没有问题。然而,对于其
下面有一段代码,它创建了数据框中每列中缺失值的汇总表。我希望我可以构建一个类似的表来计算唯一的值,但是DataFrame没有唯一的()方法,只有每一列是独立的。 (资料来源:https://stackoverflow.com/a/39734251/7044473) 如何为唯一值实现相同的功能?
我想编写一个具有重分区的大型数据帧,所以我想计算源数据帧的重分区数。 数据帧/default_blocksize的大小 所以请告诉我如何在spark scala中计算数据帧的大小 提前谢谢。
我想在数据的每一列中找到的数量,这样,如果某一列的少于某个阈值,我就可以删除该列。我看了一下,但没有找到任何功能。对我来说太慢了,因为大多数值都是不同的,而且我只对计数感兴趣。
我有一个带有五列的Spark。我想添加另一列,其值是第一列和第二列的元组。当与with Col列()方法一起使用时,我得到不匹配错误,因为输入不是列类型,而是(列,列)。我想知道在这种情况下,除了在行上运行循环之外是否有解决方案?
我有以下格式的Apache Spark数据帧 我想在DataFrame中添加一个新列:PreviousPhaseName。此列应指示同一过程的前一个不同阶段。进程的第一阶段(具有最小ID的阶段)将与前一阶段一样具有。当一个阶段发生两次或两次以上时,第二次(第三次...)事件将具有相同的previousPhaseName,例如: 我不确定如何实施这一点。我的第一个方法是: 创建第二个空数据帧DF2