当前位置: 首页 > 知识库问答 >
问题:

Apache Spark查找数据帧中第一个不同的前行

子车安和
2023-03-14

我有以下格式的Apache Spark数据帧

| ID |  groupId  | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA    |
| 11 | someHash1 | PhaseB    |
| 12 | someHash1 | PhaseB    |
| 13 | someHash2 | PhaseX    |
| 14 | someHash2 | PhaseY    |

我想在DataFrame中添加一个新列:PreviousPhaseName。此列应指示同一过程的前一个不同阶段。进程的第一阶段(具有最小ID的阶段)将与前一阶段一样具有null。当一个阶段发生两次或两次以上时,第二次(第三次...)事件将具有相同的previousPhaseName,例如:

df = 
| ID |  groupId  | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA    | null          |
| 11 | someHash1 | PhaseB    | PhaseA        |
| 12 | someHash1 | PhaseB    | PhaseA        |
| 13 | someHash2 | PhaseX    | null          |
| 14 | someHash2 | PhaseY    | PhaseX        |

我不确定如何实施这一点。我的第一个方法是:

  • 创建第二个空数据帧DF2
  • 对于DF中的每一行:
    查找具有groupId=row.groupId、ID 的行
  • 将此行添加到DF2
  • 连接df1和DF2
WindowSpec windowSpecPrev = Window
  .partitionBy(df.col("groupId"))
  .orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
  .partitionBy(df.col("groupId"), df.col("phaseName"))
  .orderBy(df.col("ID"))
  .rowsBetween(Long.MIN_VALUE, 0);

df
  .withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
  .withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
  .withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))

df = 
| ID |  groupId  | phaseName | prevPhase   | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA    | null        |  1         |  0       |
| 11 | someHash1 | PhaseB    | PhaseA      |  1         |  0       |
| 12 | someHash1 | PhaseB    | PhaseB      |  2         |  1       |
| 13 | someHash2 | PhaseX    | null        |  1         |  0       |
| 14 | someHash2 | PhaseY    | PhaseX      |  1         |  0       |
  • 实现自己的lag函数,该函数不取偏移量,但递归检查前一行,直到找到与给定行不同的值。(虽然我认为在Spark SQL中不可能使用自己的分析窗口函数)
  • 找到一种方法,根据phasecount的值动态设置lag函数的偏移量。(如果以前出现的相同相位名未出现在单个序列中,则该操作可能失败)
  • 在窗口上使用UserDefinedAggregateFunction,该窗口存储第一个给定输入的ID和phaseName,并使用不同的phaseName查找最高ID。

共有1个答案

姬翰林
2023-03-14

我能够通过以下方式解决这个问题:

  1. 获取(普通)前一阶段。
  2. 引入一个新的id,对按顺序发生的阶段进行分组。(借助这个答案)。这需要两个步骤。首先检查当前和以前的阶段名称是否相等,并相应地分配一个groupCount值。第二次计算该值的累计和。
  3. 将顺序组第一行的前一个阶段分配给其所有成员。
WindowSpec specGroup = Window.partitionBy(col("groupId"))  
                             .orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId")) 
                                  .orderBy(col("ID"))
                                  .rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
                                .orderBy(col("ID"))
                                .rowsBetween(Long.MIN_VALUE, 0);

df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV"))) 
  .withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
  .withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
  .withColumn("prevDiff", first("prevPhase").over(specPrevDiff));
df = 
| ID |  groupId  | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA    | NO_PREV   |  0       |  0         | NO_PREV  |
| 11 | someHash1 | PhaseB    | PhaseA    |  1       |  1         | PhaseA   |
| 12 | someHash1 | PhaseB    | PhaseA    |  0       |  1         | PhaseA   |
| 13 | someHash2 | PhaseX    | NO_PREV   |  0       |  0         | NO_PREV  |
| 14 | someHash2 | PhaseY    | PhaseX    |  1       |  1         | PhaseX   |

任何建议,特别是在这些行动的效率方面的建议,都是赞赏的。

 类似资料:
  • 我有两个Pandas Dataframe和,其中是的一部分,我想创建一个Dataframe,其中包含中的code>。 以下是一个例子: 注: 我的DataFrame可能有多个列,但是必须仅在列上进行匹配。

  • 我有两个数据帧,它们的列名相同,但行数不同。第一个数据帧(a)看起来与此类似: 注:站点5、6、8和12故意丢失。 第二个数据帧(b)看起来像这样: 我想要实现的是: 在那里我注入(我肯定有一个更好的术语)数据帧b到数据帧a的数据,但是我想用零替换b中的任何NAs,并保持a中的NAs不变。 我发现并尝试了这个代码: 但它会带来NAs。我考虑先将NAs替换为零,但即使如此,它也会抹去我目前在数据帧a

  • 我有两个不同列数和行数的CSV文件。第一个CSV文件有M列和N行,第二个文件有H列和G行。一些列具有相同的名称。 null 另外,如果两个CSV文件有两个数据帧,并希望这样做,例如,如果我将第一个CSV加载到中,将第二个加载到中,然后希望合并到,类似于上面的示例。

  • 我和Spark一起在Databricks上工作。编程语言是Scala。 我有两个数据帧: 主数据框:见截图:1 查找数据帧:参见屏幕截图3 我想: 查找主数据框中“年龄”=-1的所有行 我对如何做这件事伤了脑筋。我唯一想到的是将dataframe存储为DataRicks中的表,并使用SQL语句(SQL.Context.SQL…),结果非常复杂。 我想知道是否有更有效的方法。 编辑:添加可复制的示例

  • 我有两个数据帧,希望比较它们并返回第一个数据帧(df1)中不在第二个数据帧(df2)中的行。我找到了一种比较它们并返回差异的方法,但无法找出如何只返回df1中缺失的部分。

  • 我试图基于同一数据帧的另一行向数据帧添加列。我想查找第一列值等于第5列中的值的行,并将第2、3和4列附加到dataframe,如下所示 我尝试创建另一个表来合并以各种组合,但没有运气。