我有以下格式的Apache Spark数据帧
| ID | groupId | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA |
| 11 | someHash1 | PhaseB |
| 12 | someHash1 | PhaseB |
| 13 | someHash2 | PhaseX |
| 14 | someHash2 | PhaseY |
我想在DataFrame中添加一个新列:PreviousPhaseName。此列应指示同一过程的前一个不同阶段。进程的第一阶段(具有最小ID的阶段)将与前一阶段一样具有null
。当一个阶段发生两次或两次以上时,第二次(第三次...)事件将具有相同的previousPhaseName,例如:
df =
| ID | groupId | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA | null |
| 11 | someHash1 | PhaseB | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA |
| 13 | someHash2 | PhaseX | null |
| 14 | someHash2 | PhaseY | PhaseX |
我不确定如何实施这一点。我的第一个方法是:
WindowSpec windowSpecPrev = Window
.partitionBy(df.col("groupId"))
.orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
.partitionBy(df.col("groupId"), df.col("phaseName"))
.orderBy(df.col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df
.withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
.withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
.withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))
df =
| ID | groupId | phaseName | prevPhase | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA | null | 1 | 0 |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 0 |
| 12 | someHash1 | PhaseB | PhaseB | 2 | 1 |
| 13 | someHash2 | PhaseX | null | 1 | 0 |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 0 |
lag
函数,该函数不取偏移量,但递归检查前一行,直到找到与给定行不同的值。(虽然我认为在Spark SQL中不可能使用自己的分析窗口函数)phasecount
的值动态设置lag
函数的偏移量。(如果以前出现的相同相位名未出现在单个序列中,则该操作可能失败)UserDefinedAggregateFunction
,该窗口存储第一个给定输入的ID和phaseName,并使用不同的phaseName查找最高ID。我能够通过以下方式解决这个问题:
WindowSpec specGroup = Window.partitionBy(col("groupId"))
.orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV")))
.withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
.withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
.withColumn("prevDiff", first("prevPhase").over(specPrevDiff));
df =
| ID | groupId | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA | NO_PREV | 0 | 0 | NO_PREV |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 1 | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA | 0 | 1 | PhaseA |
| 13 | someHash2 | PhaseX | NO_PREV | 0 | 0 | NO_PREV |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 1 | PhaseX |
任何建议,特别是在这些行动的效率方面的建议,都是赞赏的。
我有两个Pandas Dataframe和,其中是的一部分,我想创建一个Dataframe,其中包含中的code>。 以下是一个例子: 注: 我的DataFrame可能有多个列,但是必须仅在列上进行匹配。
我有两个数据帧,它们的列名相同,但行数不同。第一个数据帧(a)看起来与此类似: 注:站点5、6、8和12故意丢失。 第二个数据帧(b)看起来像这样: 我想要实现的是: 在那里我注入(我肯定有一个更好的术语)数据帧b到数据帧a的数据,但是我想用零替换b中的任何NAs,并保持a中的NAs不变。 我发现并尝试了这个代码: 但它会带来NAs。我考虑先将NAs替换为零,但即使如此,它也会抹去我目前在数据帧a
我有两个不同列数和行数的CSV文件。第一个CSV文件有M列和N行,第二个文件有H列和G行。一些列具有相同的名称。 null 另外,如果两个CSV文件有两个数据帧,并希望这样做,例如,如果我将第一个CSV加载到中,将第二个加载到中,然后希望合并到,类似于上面的示例。
我和Spark一起在Databricks上工作。编程语言是Scala。 我有两个数据帧: 主数据框:见截图:1 查找数据帧:参见屏幕截图3 我想: 查找主数据框中“年龄”=-1的所有行 我对如何做这件事伤了脑筋。我唯一想到的是将dataframe存储为DataRicks中的表,并使用SQL语句(SQL.Context.SQL…),结果非常复杂。 我想知道是否有更有效的方法。 编辑:添加可复制的示例
我有两个数据帧,希望比较它们并返回第一个数据帧(df1)中不在第二个数据帧(df2)中的行。我找到了一种比较它们并返回差异的方法,但无法找出如何只返回df1中缺失的部分。
我试图基于同一数据帧的另一行向数据帧添加列。我想查找第一列值等于第5列中的值的行,并将第2、3和4列附加到dataframe,如下所示 我尝试创建另一个表来合并以各种组合,但没有运气。