当前位置: 首页 > 知识库问答 >
问题:

具有Spark DataFrame的通用“reduceBy”或“groupBy aggregate”功能

长孙燕七
2023-03-14

代码审查问题:带有Spark DataFrame的通用“还原按”或“分组按聚合”功能

好的,各位。也许我彻底改造了这里的轮子,或者也许我发明了一些有用的东西。你们谁能告诉我有没有更好的方法?以下是我想做的:

我想要一个通用的简化函数,它的工作原理类似于RDD的简化ByKey,但允许我使用Spark DataFrame中的任何列。你可能会说我们已经有了,它被称为groupBy,但据我所知,groupBy只允许您使用一些非常有限的选项进行聚合。我想groupBy,然后运行一个任意函数进行聚合。有人已经这样做了吗?

基本上,我正在拍摄一个看起来像这样的Spark DataFrame...

+----------+---------+-----+-------------+------------+-------------------+
| birthdate|favecolor| name|twitterhandle|facebookpage|           favesong|
+----------+---------+-----+-------------+------------+-------------------+
|2000-01-01|     blue|Alice|     allyblue|        null|               null|
|1999-12-31|     null|  Bob|         null|      BobbyG| Gangsters Paradise|
|      null|     null|Alice|         null|        null|Rolling in the Deep|
+----------+---------+-----+-------------+------------+-------------------+

。。。并通过“name”列进行缩减,得到以下结果:

+----------+---------+-------------------+-----+-------------+------------+
| birthdate|favecolor|           favesong| name|twitterhandle|facebookpage|
+----------+---------+-------------------+-----+-------------+------------+
|2000-01-01|     blue|Rolling in the Deep|Alice|     allyblue|        null|
|1999-12-31|     null| Gangsters Paradise|  Bob|         null|      BobbyG|
+----------+---------+-------------------+-----+-------------+------------+

我刚刚注意到列顺序的变化。我想我可以在开始之前记下模式,很快解决这个问题。但无论如何,我必须编写大量代码才能使其正常工作,而这似乎是一个如此简单的操作,现在其他人应该已经完成了。

下面是用Python 3.5.1和Spark 1.5.2编写的代码:

 def addEmptyColumns(df, colNames):
     """
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param df: 
     :param colNames: 
     :return:
     """
     exprs = df.columns + ["null as " + colName for colName in colNames]
     return df.selectExpr(*exprs)


 def concatTwoDfs(left, right):
     """
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param left: 
     :param right: 
     :return:
     """
     # append columns from right df to left df
     missingColumnsLeft = set(right.columns) - set(left.columns)
     left = addEmptyColumns(left, missingColumnsLeft)

     # append columns from left df to right df
     missingColumnsRight = set(left.columns) - set(right.columns)
     right = addEmptyColumns(right, missingColumnsRight)

     # let's set the same order of columns
     right = right[left.columns]

      # finally, union them
     return left.unionAll(right)


 def reduce(function, iterable, initializer=None):
     """
     A copy of the rough code from Python 2's reduce function documentation.  Why did Python 3 get rid of it?

     Apply function of two arguments cumulatively to the items of iterable, from left to right, so as to reduce the
     iterable to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5).
     The left argument, x, is the accumulated value and the right argument, y, is the update value from the iterable.
     If the optional initializer is present, it is placed before the items of the iterable in the calculation, and
     serves as a default when the iterable is empty. If initializer is not given and iterable contains only one item,
     the first item is returned.

     :param function: use this function to reduce the elements of iterable
     :param iterable:
     :param initializer:
     :return:
     """
     it = iter(iterable)
     if initializer is None:
         try:
             initializer = next(it)
         except StopIteration:
             raise TypeError('reduce() of empty sequence with no initial value')
     accum_value = initializer
     for x in it:
         accum_value = function(accum_value, x)
     return accum_value


 def concat(dfs):
     """
     Concatenates two Spark dataframes intelligently, adding missing columns with 'null' entry where appropriate.
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param dfs: a list or tuple of two Spark dataframes
     :return: single dataframe consisting of dfs' columns and data
     """
     return reduce(concatTwoDfs, dfs)


 def combine_rows(row1, row2):
     """
     Takes two rows assumed to have the same columns, combines them, using values from row1 when available, from row2
     otherwise.

     :param row1: pyspark.sql.Row
     :param row2: pyspark.sql.Row
     :return: pyspark.sql.Row combined from row1 and row2
     """
     from pyspark.sql import Row
     combined = {}
     for col in row1.asDict():
         if row1.asDict()[col] is not None:
             combined[col] = row1.asDict()[col]
         else:
             combined[col] = row2.asDict()[col]
     return Row(**combined)


 def remove_nones(row):
     """
     Takes in a row, returns that same row minus all of the columns that have a None entry.  This is required in
     order to create a new DataFrame using only this row; DataFrame will not be created if it doesn't know what kind
     of value to expect in a column.

     :param row:
     :return:
     """
     from pyspark.sql import Row
     cleaned = {}
     for col in row.asDict():
         if row.asDict()[col] is not None:
             cleaned[col] = row.asDict()[col]
     return Row(**cleaned)


 def reduce_by(df, col, func):
     """
     Does pretty much the same thing as an RDD's reduceByKey, but much more generic.  Kind of like a Spark DataFrame's
     groupBy, but lets you aggregate by any generic function.

     :param df: the DataFrame to be reduced
     :param col: the column you want to use for grouping in df
     :param func: the function you will use to reduce df
     :return: a reduced DataFrame
     """
     first_loop = True
     unique_entries = df.select(col).distinct().collect()
     return_rdd = sc.parallelize([])
     for entry in unique_entries:
         if first_loop:
             return_df = sqlContext.createDataFrame( \
                                 sc.parallelize([remove_nones(df.filter(df[col] == entry[0]).rdd.reduce(func))]))
             first_loop = False
         else:
             return_df = concat((return_df, \
                                sqlContext.createDataFrame( \
                                 sc.parallelize([remove_nones(df.filter(df[col] == entry[0]).rdd.reduce(func))]))))
     return return_df

您可以创建一个名为test\u df的数据帧,然后运行以下命令:

reduce_by(test_df, 'name', combine_rows).show()

共有1个答案

萧德庸
2023-03-14

我认为,对于您特定的聚合需求,这也会起到作用:

from pyspark.sql import SQLContext

data = sc.parallelize([("2000-01-01", "blue", "Alice", "allyblue", None, None),\
                      ("1999-12-31", None, "Bob", None, "BobbyG", "Gangsters Paradise"),\
                         (None, None, "Alice", None, None, "Rolling in the Deep") ])

df = sqlContext.createDataFrame(\
data, ["birthdate", "favecolor", "name", "twitterhandle", "facebookpage", "favesong"])

df = df.groupBy(df.name).agg({'birthdate': 'min', 'favecolor':'min', \
                        'twitterhandle':'min', 'facebookpage':'min', 'favesong':'min'})
print df.collect()

[Row(name=u'Alice', min(favesong)=u'Rolling in the Deep',
min(twitterhandle)=u'allyblue', min(favecolor)=u'blue', 
min(facebookpage)=u'null', min(birthdate)=u'2000-01-01'), Row(name=u'Bob',
min(favesong)=u'Gangsters Paradise', min(twitterhandle)=u'null', 
min(favecolor)=u'null', min(facebookpage)=u'BobbyG', min(birthdate)=u'1999-12-31')]
 类似资料:
  • 问题内容: 我需要显示来自cron作业的通知。我的crontab是这样的: 我检查了一下,该命令实际上每分钟执行一次,但不会弹出通知。有人可以帮我理解为什么吗? 问题答案: 我找到了答案: 谢谢,拉维

  • 问题内容: 我正在尝试使用CONTAINS函数(MS SQL)创建Criteria API查询: 从com.t_person中选择*,其中包含(last_name,’xxx’) 但是出现异常:org.hibernate.hql.internal.ast.QuerySyntaxException:意外的AST节点: 有什么帮助吗? 问题答案: 如果您要坚持使用,则应该是这样的: 您的问题似乎缺少一些

  • 我想用Criterias执行这个查询,但它似乎没有按预期工作: 我试过这个 但是它返回一个结果,就像我在执行这个查询一样 它只返回列描述。 我的问题是,如何告诉Hibernate我想要表中的所有列,但我不想重复Description列中的变量?

  • 我有一个云函数,可以获取一些JSON数据。这就是它的全部功能。我遵循了视频中强调的提示:https://www.youtube.com/watch?v=IOXrwFqR6kY 所以,我有cors和rp依赖项,除了我的函数之外什么都没有。数据是压缩的(我认为这是默认功能)。Chrome开发工具显示数据是gzip压缩的。压缩后是37KB。开发工具一致显示TTFB约为4.5秒。内容下载仅约为7.8ms

  • 我想使用和,有什么方法或可靠的解决办法可以使Pageable与jpaQuery一起工作吗?

  • 我每年都通过联接进行