当前位置: 首页 > 知识库问答 >
问题:

Spark sql:如何在组级别实现数据帧的并行处理,但是在每个组中,我们需要对行进行顺序处理

越扬
2023-03-14
  1. 在数据帧上应用分组。让我们假设它产生了100组,每组10行
  2. 我有一个必须应用于每个组的函数。它可以以并行方式和任何顺序发生(即,由spark自行决定以任何顺序选择任何组执行)
  3. 但是对于in group,我需要保证对行进行顺序处理。因为在处理组中的每一行之后,我会在处理组中剩余的任何一行时使用输出

我们采用了以下方法,即所有内容都在驱动程序上运行,并且无法利用火花集群跨节点的并行处理(正如预期的那样,性能非常糟糕)

1) 将主DF分解为多个数据帧,放置在一个阵列中:

val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))

2) 循环遍历数据帧,并从数据帧上方逐行传递给要处理的方法:

df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
  row => AllocOneOutput.allocOneOutput(row)}
)

我们正在寻找的是并行和顺序处理的结合。

>

  • 组级并行处理。因为,这些都是独立的组,可以并行。

    在每个组中,行必须按顺序依次处理,这对我们的用例非常重要。

    样本数据

    在安全ID、CC、BU、MPU上应用分组,从上面给我们两个分组(SECID_1、CC_A、BU_A、MPU_A和SECID_2、CC_A、BU_A、MPU_A)。

    在优先级矩阵的帮助下(只不过是一个参考表,用于将排名转换为行),我们将每个组转换为以下内容:

    转置数据

    上面组中的每一行都有一个优先级,并按此顺序排序。现在,我想通过将每一行传递给一个函数来一个接一个地处理它们,并获得如下输出:

    输出

    usecase详细说明:

    1. 基本数据框架包含金融公司的所有交易头寸数据。一些客户购买(长期)给定的金融产品(由securityId唯一标识),一些客户出售(短期)该产品
    2. 我们应用程序的目的是识别/配对给定securityId中的多头头寸和空头头寸
    3. 由于这种配对是在securityId中进行的,所以我们说基本数据帧基于此securityId被划分为多个组,每个组都可以独立处理
    4. 为什么我们要在一个组中寻找顺序处理?这是因为,当给定组中有许多多头仓位和许多空头仓位(如示例数据所示)时,参考表(优先级矩阵)决定哪个多头仓位必须与哪个空头仓位配对。基本上,它给出了处理顺序
    5. 第二个原因是,当给定的长数量和短数量不相等时,剩余数量可以配对。i、 例如,如果剩余的是长数量,则可以根据优先级将其与组中的下一个可用短数量配对,反之亦然
    6. 因为第4条提到的原因

    以上几点使用以下数据集进行描述。

    基本数据帧

        +-------------+----------+----------+------
    ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
    +-------------+----------+----------+------
      1|      secId|   Acc1|       +100|
      2|      secId|   Acc2|       -150|
      3|      secId|   Acc3|       -25|
      4|      secId2|   Acc3|       -25|
      5|      secId2|   Acc3|       -25|
    

    基本数据帧是按安全ID分组的。下面让我们使用secId组

    +-------------+----------+----------+------
    ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
    +-------------+----------+----------+------
      1|      secId|   Acc1|       +100|
      2|      secId|   Acc2|       -150|
      3|      secId|   Acc3|       -25|
    

    在上述情况下,100的正位可以与-50或-25配对。为了打破僵局,以下名为优先级矩阵的ref表通过定义顺序来提供帮助。

    +-------------+----------+----------+------
    +vePositionAccount|-vePositionAccount| RANK
    +-------------+----------+----------+------
      Acc1|            Acc3|              1|     
      Acc1|            Acc2|              2|  
    

    所以,从上面的矩阵中,我们知道第1行和第3行将首先配对,然后是第1行和第2行。这就是我们所说的顺序(顺序处理)。让我们现在将它们配对如下:

    +-------------+----------+----------+------+-------------+----------+----------+------
    +veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
    +-------------+----------+----------+------+-------------+----------+----------+------
      1|      secId|    Acc1|       +100|      3|       secId|   Acc3|       -25|
      1|      secId|    Acc1|       +100|      2|       secId|   Acc2|       -150|
    

    当第1行在第2行之后处理时会发生什么?(这就是我们需要的)

    1.处理完第1行后,Acc1中的位置将为(100-25)=Acc3中的75个位置将为0。Acc1中更新的位置为75,现在将用于处理第二行。

    2.处理第2行后,Acc1中的位置将为0。Acc2中的位置将是(75-150) -75。

    结果数据帧:

    +-------------+----------+----------+------
    ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
    +-------------+----------+----------+------
      1|      secId|   Acc1|       0|
      2|      secId|   Acc2|       -75|
      3|      secId|   Acc3|       0|
    

    当第2行在第1行之后处理时会发生什么?(我们不想要这个)

    1. 处理完第2行后,Acc1中的位置将为0,Acc2中的位置将为(100-150)-50。Acc1中更新的位置为0,现在将用于处理第一行

    结果数据帧:

    +-------------+----------+----------+------
    ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
    +-------------+----------+----------+------
      1|      secId|   Acc1|       0|
      2|      secId|   Acc2|       -50|
      3|      secId|   Acc3|       -25|
    

    如上所述,在组中处理的顺序决定了我们的输出

    我还想问-为什么火花不支持数据帧中的顺序处理?我们说我们需要集群的并行处理能力。这就是为什么我们将数据帧分成几个组,并要求集群并行地在这些组上应用逻辑。我们所说的是,如果这个组有let说100行,那么让这100行在一个顺序中一个接一个地处理。这不是火花支持的吗?

    如果不是,那么大数据中还有什么其他技术可以帮助实现这一点?

    替代实施:

    1. 将数据帧划分为与组数相同的多个分区(在本例中为50000个分区。组数更多,但任何组中包含的行数不超过100个)
    2. 在数据帧上运行“ForeachPartition”操作,在该数据帧中,逻辑是跨分区独立执行的
    3. 将每个分区的处理结果写入集群
    4. 处理完整个数据帧后,一个单独的作业将从步骤3中读取这些单独的文件,并写入单个文件/数据帧

    我怀疑成千上万的分区是否有任何好处,但我想知道这种方法听起来是否不错。

  • 共有1个答案

    常自怡
    2023-03-14

    这个概念工作得很好,直到这条规则:

    这是因为您需要迭代,使用依赖逻辑循环,这很难使用更面向流的Spark进行编码。

    我还参与了一个项目,在该项目中,所有内容都是明确的——使用Spark、scala或pyspark在大数据中进行。作为一名架构师和程序员,我研究了一些类似于您所在区域的算法,但并不完全相同,在这些算法中,对于商品,一组数据点的所有周期都需要归类为牛市、熊市或非牛市。就像你的算法,但仍然不同,我不知道前面要做多少循环。事实上,我需要做一些事情,然后决定在我标记为牛市或熊市或什么都没有的时间段左右重复这些事情。需要终止条件。见下图。有点像“扁平”二叉树遍历,直到所有路径耗尽。不是那种火花。

    实际上,出于学术目的,我解决了我在Spark的具体情况,但这是一次学术练习。问题的关键是,这种类型的处理——我的例子和你的例子都不适合Spark。我们在ORACLE中进行了这些计算,并将结果简单地存储到Hadoop数据存储中。

    因此,我的建议是你不要在Spark中尝试这个,因为它不够适合用例。相信我,它会变得混乱。老实说,我很快就发现这种类型的处理是一个问题。但是当开始的时候,这是一个常见的查询方面。

     类似资料:
    • 问题内容: 我目前使用nodejs创建一些实验项目。我已经用Spring编写了很多Java EE Web应用程序,并赞赏那里的依赖注入的简易性。 现在我很好奇:如何使用节点进行依赖注入?或者:我什至需要吗?是否存在替代概念,因为编程风格不同? 到目前为止,我在谈论简单的事情,例如共享数据库连接对象,但是我还没有找到一个令我满意的解决方案。 问题答案: 简而言之,您不需要像C#/ Java中那样的依

    • 我有一个进程,它要求处理dataframe的每一行,然后向每一行追加一个新值。这是一个很大的数据帧,一次处理一个数据帧需要几个小时。 如果我有一个将每一行发送到一个函数的迭代罗循环,我可以并行处理以加快速度吗?行的结果不相关 基本上我的代码是这样的 有没有一种简单的方法可以这样做来加快处理速度?

    • 我有一个pandas数据帧像: 我想按第一列进行分组,并将第二列作为行中的列表:

    • 问题内容: 所以我有这张桌子。它有几百行。每行中都有一个日期时间字段。我需要完成的是获取给定时间段内有多少行,而不是整个时间段,而是该时间段的每一天。到此为止,我知道该怎么办。但是,此外,我还需要在表中没有值0的日期的行。 因此,例如: 应该给我这样的结果: 任何人都可以帮忙吗? 问题答案: 为了处理带有0条对应记录的日期,我的常规做法是使用日历表进行联接。 例如,创建一个表,其中一个字段称为,并

    • 问题内容: 我有一个这样的数据框: 我要 然后然后为每个pidx 然后是每个组的前2名。 我正在寻找的结果是这样的: 我试过的是: 这似乎可行,但我不知道如果处理庞大的数据集,这是否是正确的方法。我还能使用什么其他最佳方法来获得这种结果? 问题答案: 有两种解决方案: 1.和合计: 2.和合计: 时间 :

    • 问题内容: 我有多个数组,我想根据其中一个的排序顺序对所有数组进行排序,如下所示: 我希望函数执行后,数组将如下所示: 问题答案: 您可以执行以下操作:首先根据键控数组的索引的索引对它们进行索引的值对它们进行排序,然后使用: 如果要在任何类型的集合上使它通用(但仍以与std lib集合算法相同的样式返回数组): 以及带有自定义比较器的版本: