我有一个像这样的PySpark数据帧
+----------+--------+----------+----------+
|id_ | p |d1 | d2 |
+----------+--------+----------+----------+
| 1 | A |2018-09-26|2018-10-26|
| 2 | B |2018-06-21|2018-07-19|
| 2 | B |2018-08-13|2018-10-07|
| 2 | B |2018-12-31|2019-02-27|
| 2 | B |2019-05-28|2019-06-25|
| 3 |C |2018-06-15|2018-07-13|
| 3 |C |2018-08-15|2018-10-09|
| 3 |C |2018-12-03|2019-03-12|
| 3 |C |2019-05-10|2019-06-07|
| 4 | A |2019-01-30|2019-03-01|
| 4 | A |2019-05-30|2019-07-25|
| 5 |C |2018-09-19|2018-10-17|
-------------------------------------------
在此基础上,我想创建并填充另一个Pyspark数据框,其中包含n
列,范围从min(d1)
到max(d2)
,每列都是该范围内的日期。
我想用每行1和0填充此数据框。
对于第1行,我希望使用1
填充从min(第1行的d1)到max(第1行的d1)范围内的所有天数,并使用0
填充其余列。对于数据帧中的所有行也是如此。
为了这个目的,我在熊猫身上做了类似的事情。
result = pd.DataFrame(data = 0, columns=pd.period_range(data['d1'].min(), data['d2'].max(), freq='D'), index=data.index)
for c in result.columns:
result[c] = np.where((c.d2>=data.d1)&(c.d1 <= data.d2), 1, 0)
如何在PySpark中执行同样的操作。?
这里是一种方法(我只使用了很少的行和较小的日期范围在这里打印输出)
from pyspark.sql import SparkSession,Row
import pyspark.sql.functions as F
import pyspark.sql.types as T
import datetime
def fill_dates(d1, d2, start_date, no_of_date_cols):
start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
d1 = datetime.datetime.strptime(d1, '%Y-%m-%d')
d2 = datetime.datetime.strptime(d2, '%Y-%m-%d')
cols = {}
for x in range(0, no_of_date_cols):
col = (start_date + datetime.timedelta(days=x)).strftime('%Y-%m-%d')
if datetime.datetime.strptime(col, '%Y-%m-%d') >= d1 and datetime.datetime.strptime(col, '%Y-%m-%d') <= d2:
cols[col] = 1
else:
cols[col] = 0
return cols
spark = SparkSession \
.builder \
.appName("Filling_Dates_Cols") \
.config("master", "local") \
.getOrCreate()
df = spark.createDataFrame([
[1, 'A', '2018-09-26', '2018-09-28'],
[2, 'B', '2018-09-20', '2018-09-22'],
[2, 'B', '2018-09-23', '2018-09-26'],
[3, 'C', '2018-09-15', '2018-09-26']
], schema=['id', 'p', 'd1', 'd2'])
min_max_dates = df.select(
F.min('d1').alias('min'),
F.max('d2').alias('max')
).collect()[0]
min_date = min_max_dates[0]
max_date = min_max_dates[1]
d1 = datetime.datetime.strptime(min_date, '%Y-%m-%d')
d2 = datetime.datetime.strptime(max_date, '%Y-%m-%d')
no_of_date_cols = (d2 - d1).days + 1
schema = T.StructType()
for x in range(0, no_of_date_cols):
new_col = (d1 + datetime.timedelta(days=x)).strftime('%Y-%m-%d')
schema = schema.add(new_col, T.IntegerType())
fill_dates_udf = F.udf(fill_dates, schema)
df = df.withColumn(
'dates',
fill_dates_udf(F.col('d1'), F.col('d2'), F.lit(min_date), F.lit(no_of_date_cols))
)
df.select('id', 'p', 'd1', 'd2', 'dates.*').show()
导致
+---+---+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| id| p| d1| d2|2018-09-15|2018-09-16|2018-09-17|2018-09-18|2018-09-19|2018-09-20|2018-09-21|2018-09-22|2018-09-23|2018-09-24|2018-09-25|2018-09-26|2018-09-27|2018-09-28|
+---+---+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| 1| A|2018-09-26|2018-09-28| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1| 1|
| 2| B|2018-09-20|2018-09-22| 0| 0| 0| 0| 0| 1| 1| 1| 0| 0| 0| 0| 0| 0|
| 2| B|2018-09-23|2018-09-26| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1| 1| 1| 0| 0|
| 3| C|2018-09-15|2018-09-26| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 0| 0|
+---+---+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
列表理解的一种方法:
更新:根据请求,将d1
,d2
字段从StringType调整为DateType。
import pandas as pd
from pyspark.sql import functions as F
#... skip the code to initialize SparkSession spark and df
# if d1 and d2 were read as String, convert them to Date using the following.
# Or if the data were already imported with explicit schema or inferSchema=True when running read.csv(), then skip the following:
df = df.withColumn('d1', F.to_date('d1')) \
.withColumn('d2', F.to_date('d2'))
>>> df.show()
+---+---+----------+----------+
|id_| p| d1| d2|
+---+---+----------+----------+
| 1| A|2018-09-26|2018-10-26|
| 2| B|2018-06-21|2018-07-19|
| 2| B|2018-08-13|2018-10-07|
| 2| B|2018-12-31|2019-02-27|
| 2| B|2019-05-28|2019-06-25|
| 3| C|2018-06-15|2018-07-13|
| 3| C|2018-08-15|2018-10-09|
| 3| C|2018-12-03|2019-03-12|
| 3| C|2019-05-10|2019-06-07|
| 4| A|2019-01-30|2019-03-01|
| 4| A|2019-05-30|2019-07-25|
| 5| C|2018-09-19|2018-10-17|
+---+---+----------+----------+
>>> df.printSchema()
root
|-- id_: string (nullable = true)
|-- p: string (nullable = true)
|-- d1: date (nullable = true)
|-- d2: date (nullable = true)
d = df.select(F.min('d1').alias('start_date'), F.max('d2').alias('end_date')).first()
>>> d
Row(start_date=datetime.date(2018, 6, 15), end_date=datetime.date(2019, 7, 25))
cols = [ c.to_timestamp().date() for c in pd.period_range(d.start_date, d.end_date, freq='D') ]
>>> cols
[datetime.date(2018, 6, 15),
datetime.date(2018, 6, 16),
...
datetime.date(2019, 7, 23),
datetime.date(2019, 7, 24),
datetime.date(2019, 7, 25)]
使用列表理解迭代cols
,F.when(条件1)中的所有日期。否则(0)
设置列值,并str(c)
设置列名(别名):
result = df.select('id_', *[ F.when((df.d1 <= c)&(df.d2 >= c),1).otherwise(0).alias(str(c)) for c in cols ])
# check data in some columns
result.select('id_', str(d.start_date), '2019-01-01', str(d.end_date)).show()
+---+----------+----------+----------+
|id_|2018-06-15|2019-01-01|2019-07-25|
+---+----------+----------+----------+
| 1| 0| 0| 0|
| 2| 0| 0| 0|
| 2| 0| 0| 0|
| 2| 0| 1| 0|
| 2| 0| 0| 0|
| 3| 1| 0| 0|
| 3| 0| 0| 0|
| 3| 0| 1| 0|
| 3| 0| 0| 0|
| 4| 0| 0| 0|
| 4| 0| 0| 1|
| 5| 0| 0| 0|
+---+----------+----------+----------+
我用的是PySpark,我有一个Spark数据框架,里面有一堆数字列。我想添加一列,它是所有其他列的总和。 假设我的数据帧具有列“a”、“b”和“c”。我知道我能做到: 问题是,我不想单独键入每列并添加它们,尤其是如果我有很多列。我希望能够自动执行此操作,或者通过指定要添加的列名列表来执行此操作。有其他方法吗?
我有以下问题,因为我是pyspark的新手。基于来自两列的值的条件,我想填充第三列。条件是: < li >如果分号包含在col2中,请检查col1 < ul > < li >如果col1 == 1,则取分号前的值 < li >如果col1 == 2,则取分号后的值 这就是数据帧的样子。 我创建了下面的udf,它给出了错误“不能将列转换为布尔值:请使用” 我通过谷歌搜索各种功能构建了这个udf,所以
我有以下示例数据框架: 我只想替换前两列中的空值——列“a”和“b”: 这是创建示例数据帧的代码: 我知道如何使用替换所有空值: 当我尝试这样做时,我失去了第三列:
我想在一个组合框中填充一些数据。这很好。但是,我想动态地改变组合框中的数据。 目前我有一个表,在组合框中,我在表列中显示数据的唯一值。现在,表数据存储在静态可观察列表变量中。我想根据表中显示的数据更改组合框中的数据。也就是说,如果存储表数据的静态可观察列表发生更改,我希望在没有手动干预的情况下更改组合框数据。 如果没有单独的静态变量用于组合框,这是否可能?
问题内容: 如果我有一个包含多列的数据框,如何只填充一列?还是一组列? 我只知道如何按轴进行操作。 问题答案: tl; dr: 我还添加了一个自我包含的示例:
我有一个单列PySpark数据框。 我想使用像熊猫ffill()函数一样的前向填充来计算缺失值。 期望输出 免责声明:我在stackoverflow中有一些解决方案,但当您只有一列作为输入时,它们就不起作用了。