当前位置: 首页 > 面试题库 >

PySpark:计算列子集的行最大值,并添加到现有数据框中

白烨煜
2023-03-14
问题内容

我想为每一行计算最大列子集,并将其添加为现有列的新列Dataframe

我以非常尴尬的方式做到了这一点:

def add_colmax(df,subset_columns,colnm):
     '''
     calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
     new column containing row wise maximum is added to dataframe df.

     df: dataframe. It must contain subset_columns as subset of columns
     colnm: Name of the new column containing row-wise maximum of subset_columns
     subset_columns: the subset of columns from w
     '''
     from pyspark.sql.functions import monotonicallyIncreasingId
     from pyspark.sql import Row
     def get_max_row_with_None(row):
         return float(np.max(row))

     df_subset = df.select(subset_columns)
     rdd = df_subset.map( get_max_row_with_None)
     df_rowsum = rdd.map(Row(colnm)).toDF()
     df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
     df = df.withColumn("id",monotonicallyIncreasingId())
     df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
     return df

该功能的工作原理是:

rdd1 =  sc.parallelize([("foo", 1.0,3.0,None), 
                    ("bar", 2.0,2.0,-10), 
                    ("baz", 3.3,1.2,10.0)])


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")   
df_new.collect()

返回:

 [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
  Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
  Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

我认为,如果可以使用带有的用户定义函数withColumn,则可以更简单地完成。但是我不知道该怎么做。如果您有更简单的方法来实现这一目标,请告诉我。我正在使用Spark
1.6


问题答案:

让我们从几个导入开始

from pyspark.sql.functions import col, lit, coalesce, greatest

接下来定义负无穷大字面量:

minf = lit(float("-inf"))

映射列并将结果传递给greatest

rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])

最后withColumn

df1.withColumn("rowmax", rowmax)

结果:

+---+---+---+----+------+
| v1| v2| v3|  v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null|   3.0|
|bar|2.0|2.0| -10|   2.0|
|baz|3.3|1.2|null|   3.3|
+---+---+---+----+------+

您可以将相同的模式用于不同的按行操作,minf以中性元素代替。例如:

rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])

要么:

from operator import mul
from functools import reduce

rowproduct = reduce(
  mul, 
  [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)

您自己的代码可以通过以下方式大大简化udf

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def get_max_row_with_None_(*cols):
    return float(max(x for x in cols if x is not None))

get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))

更换minflit(float("inf"))greatestleast获得每行的最小值。



 类似资料:
  • 问题内容: 我在pyspark中使用sqlContext.sql函数读取了一个数据框。它包含4个数字列,每个客户都有信息(这是键ID)。我需要计算每个客户端的最大值并将此值加入数据框: 在这种情况下,客户端“ six”的最大值为23,而客户端“ ten”的最大值为30。“ null”在新列中自然为null。 请帮助我显示如何执行此操作。 问题答案: 我认为将值组合到列表中而不是找到最大值将是最简单

  • 我需要在pyspark数据框中使用窗口上的max date行中的列值创建一个新列。鉴于下面的数据框架,我需要根据最近日期的调整系数为每个资产的每个记录设置一个名为max_adj_factor的新列。

  • 问题内容: 我有一个OHLC价格数据集,该数据集已从CSV解析为Pandas数据框,并重新采样为15分钟的柱形: 我想添加各种计算的列,从简单的列开始,例如期间范围(HL),然后是布尔值以指示我将定义的价格模式的出现-例如锤形蜡烛模式,为其定义示例: 基本问题:如何将函数映射到列,特别是在我想引用多个其他列或整行或其他内容的地方? 这篇文章处理从单个源列添加两个计算列,这是很接近的,但还不完全是。

  • 我有一个pyspark数据框,在这里我可以找到每列的最小/最大值和最小/最大值计数。我可以使用: 我希望在同一数据帧中也有最小/最大值的计数。我需要的具体输出: …|col|n|col|m| …|xn | xm |。。。最小值(col(coln)) 计数(col_n==xn)|计数(col_m==xm)|。。。

  • 给定一个非负整数数组,设计最简单的算法来找到最大大小的子数组,并将其加到最小的值。 我的想法是,因为它们是非负整数,所以和最小的数组总是单个单元数组,只有原始数组的最小值。如果我理解正确的话,它取决于什么具有更高的优先级,具有更高的长度或更小的值。然而,这个问题从来没有明确说明哪一个优先。 我在这个问题上是正确的,还是我遗漏了什么?

  • 问题内容: 我有一个现有的数据框,我需要添加一个额外的列,每行将包含相同的值。 现有的df: 新的df: 我知道如何追加现有的series / dataframe列。但这是另一种情况,因为我所需要的只是添加“名称”列,并将每一行设置为相同的值,在本例中为“ abc”。 问题答案: 将添加新列并将所有行设置为该值: