当前位置: 首页 > 知识库问答 >
问题:

基于数据库上另一个pyspark数据框的某些列,在大型pyspark数据框的列上执行用户定义的函数

宇文念
2023-03-14

我的问题与上一个问题相关,即如何有效地连接大型pyspark数据帧和小型python列表,以获得数据块上的一些NLP结果。

我已经解决了一部分,现在被另一个问题卡住了。

我有一个小型pyspark数据帧,比如:

  df1: 

   +-----+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
   |topic|                                       termIndices|                                       termWeights|                                             terms|
   +-----+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
   |    0|      [3, 155, 108, 67, 239, 4, 72, 326, 128, 189]|[0.023463344607734377, 0.011772322769900843, 0....|[cell, apoptosis, uptake, loss, transcription, ...|
   |    1|      [16, 8, 161, 86, 368, 153, 18, 214, 21, 222]|[0.013057307487199429, 0.011453455929929763, 0....|[therapy, cancer, diet, lung, marker, sensitivi...|
   |    2|            [0, 1, 124, 29, 7, 2, 84, 299, 22, 90]|[0.03979063871841061, 0.026593954837078836, 0.0...|[group, expression, performance, use, disease, ...|
   |    3|   [204, 146, 74, 240, 152, 384, 55, 250, 238, 92]|[0.009305626056223443, 0.008840730657888991, 0....|[pattern, chemotherapy, mass, the amount, targe...|

它只有不到 100 行,而且非常小。每个术语在“术语权重”列中都有一个术语权重值。

我还有另一个大型 pyspark 数据帧 (50 GB),如下所示:

  df2: 
  +------+--------------------------------------------------+
  |r_id|                                    tokens|
  +------+--------------------------------------------------+
  |     0|[The human KCNJ9, Kir, GIRK3, member, potassium...|
  |     1|[BACKGROUND, the treatment, breast, cancer, the...|
  |     2|[OBJECTIVE, the relationship, preoperative atri...|

对于df2中的每一行,我需要在所有主题中找到df1中具有最高术语权重的最佳匹配术语。

最后,我需要一个像

 r_id tokens topic (the topic in df1 that has the highest sum of termWeights among all topics)

我已经定义了一个UDF(基于df2),但它不能访问df1的列。我正在考虑如何为df1和df2使用“交叉连接”,但我不需要将df2的每一行与df1的每一行连接起来。我只需要保留df2的所有列,并根据每个df1的主题与每个df2的行的术语的匹配项添加一列,该列是术语权重之和最高的“主题”。

我不确定pyspark.sql.functions.udf怎么实现这个逻辑。

共有1个答案

郁烨
2023-03-14

IIUC,您可以尝试如下操作(我将处理流程分为 4 个步骤,需要 Spark 2.4):

步骤1:将所有df2.tokens转换为小写,以便我们可以进行文本比较:

from pyspark.sql.functions import expr, desc, row_number, broadcast

df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

步骤2:使用arrays_overlap将df2与df1左连接

df3 = df2.join(broadcast(df1), expr("arrays_overlap(terms, tokens)"), "left")

步骤3:使用聚合函数根据术语,术语权重和令牌计算matched_sum_of_weights

df4 = df3.selectExpr(
    "r_id",
    "tokens",
    "topic",
    """
      aggregate(
        /* find all terms+termWeights which are shown in tokens array */
        filter(arrays_zip(terms,termWeights), x -> array_contains(tokens, x.terms)),
        0D,
        /* get the sum of all termWeights from the matched terms */
        (acc, y) -> acc + y.termWeights
      ) as matched_sum_of_weights
    """)

步骤4:对于每个r_id,使用Window函数查找具有最高matched_sum_of_weights的行,并仅保留具有row_number==1的行

from pyspark.sql import Window
w1 = Window.partitionBy('r_id').orderBy(desc('matched_sum_of_weights'))

df_new = df4.withColumn('rn', row_number().over(w1)).filter('rn=1').drop('rn', 'matched_sum_of_weights')

备选方案:如果df1的大小不是很大,这可以在没有join/window.partition等的情况下处理。以下代码仅概述了您应该根据实际数据进行改进的想法:

from pyspark.sql.functions import expr, when, coalesce, array_contains, lit, struct

# create a dict from df1 with topic as key and list of termWeights+terms as value
d = df1.selectExpr("string(topic)", "arrays_zip(termWeights,terms) as terms").rdd.collectAsMap()

# ignore this if text comparison are case-sensitive, you might do the same to df1 as well
df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

# save the column names of the original df2
cols = df2.columns

# iterate through all items of d(or df1) and update df2 with new columns from each 
# topic with the value a struct containing `sum_of_weights`, `topic` and `has_match`(if any terms is matched)
for x,y in d.items():
  df2 = df2.withColumn(x,
      struct(
        sum([when(array_contains('tokens', t.terms), t.termWeights).otherwise(0) for t in y]).alias('sum_of_weights'),
        lit(x).alias('topic'),
        coalesce(*[when(array_contains('tokens', t.terms),1) for t in y]).isNotNull().alias('has_match')
      )
  )

# create a new array containing all new columns(topics), and find array_max
# from items with `has_match == true`, and then retrieve the `topic` field
df_new = df2.selectExpr(
    *cols,
    f"array_max(filter(array({','.join(map('`{}`'.format,d.keys()))}), x -> x.has_match)).topic as topic"
)
 类似资料:
  • 我有两个pyspark数据帧 DF1 : df2: 我想向df1添加一个列Location_Id,从df2获取匹配的Id,如下所示: 我如何才能做到这一点?

  • 问题内容: 我有一个像这样的简单数据框: 我需要按日期进行调整: 一切正常。但是现在我需要对其进行透视,并获得一个非数字列: 当然,我会得到一个例外: 我想产生一些东西 有可能吗? 问题答案: 假设组合是唯一的,并且您的唯一目标是枢纽而不是合计,则可以使用(或任何其他不限于数值的函数): 如果这些假设不正确,则必须预先汇总数据。例如,对于最常见的值:

  • 问题内容: 我在python / pyspark中有一个带有列的数据框 ,依此类推...... 现在,我在此数据框中添加了新列。 现在,我必须安排这样的列的列来后 我已经完成如下 我收到此错误 为什么会发生此错误。我该如何纠正。 问题答案: 您可以用来更改列的顺序:

  • 问题内容: 我有这样的数据我想创建一个PySpark数据框 我已经用过 但我总是得到 我如何获得如下所示的结果? 问题答案: 我发现将参数视为元组列表很有用,其中列表中的每个条目对应于DataFrame中的一行,而元组的每个元素对应于一列。 您可以通过将列表中的每个元素设为元组来获得所需的输出: 或者,如果更改源很麻烦,则可以等效地执行以下操作:

  • 问题内容: 我有两个数据框,第一个有1000行,看起来像: 该列具有不同的值,有时会重复,但通常大约有50个唯一值。 第二个数据框包含所有这50个唯一值(50行)以及与这些值关联的酒店: 我的目标是用第二个数据帧的列的相应值替换第一个数据帧的列中的值,或者用相应的值创建该列。当我尝试通过像 我有一个错误,即数据帧的大小不相等,因此无法进行比较 问题答案: 如果将索引设置为另一个df上的“组”列,则

  • 问题内容: 我有两个Spark数据框: 数据框A: 和数据框B: 数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。 我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说,并可以改变值(可更新),但是是唯一的。我创建了一个哈希函数为: 现在,我想编写一些火花代码,基本上从B中