当前位置: 首页 > 知识库问答 >
问题:

Apache Spark中的DataFrame平等

葛宪
2023-03-14

假设df1df2是Apache Spark中的两个DataFrames,使用两种不同的机制计算,例如Spark SQL和Scala/Java/Python API。

是否有一种惯用的方法来确定两个数据帧是否相等(相等,同构),其中等价性是由数据(每行的列名和列值)相同(除了行的顺序)来确定的

这个问题的动机是,通常有很多方法来计算一些大数据结果,每种方法都有自己的权衡。在探索这些权衡时,重要的是保持正确性,因此需要检查有意义的测试数据集的等效性/相等性。

共有3个答案

赫连照
2023-03-14

我不知道惯用语,但我认为您可以获得一种健壮的方法来比较数据帧,如下所述。(我使用PySpark进行说明,但这种方法可以跨语言使用。)

a = spark.range(5)
b = spark.range(5)

a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()

assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0

这种方法可以正确处理数据帧可能具有重复行、不同顺序的行和/或不同顺序的列的情况。

例如:

python prettyprint-override">a = spark.createDataFrame([('nick', 30), ('bob', 40)], ['name', 'age'])
b = spark.createDataFrame([(40, 'bob'), (30, 'nick')], ['age', 'name'])
c = spark.createDataFrame([('nick', 30), ('bob', 40), ('nick', 30)], ['name', 'age'])

a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()
c_prime = c.groupBy(sorted(c.columns)).count()

assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0
assert a_prime.subtract(c_prime).count() != 0

这种方法非常昂贵,但考虑到需要执行完全差异,大部分费用是不可避免的。这应该可以扩展,因为它不需要在本地收集任何东西。如果您放松了比较应该考虑重复行的限制,那么您可以删除groupBy(),只需执行subtract(),这可能会显著加快速度。

孔星宇
2023-03-14

Apache Spark测试套件中有一些标准方法,但是其中大部分涉及在本地收集数据,如果您想在大型数据帧上进行相等测试,那么这可能不是一个合适的解决方案

首先检查模式,然后可以与df3相交,并验证df1和df2的计数

另一个选项是获取两个数据帧的底层RDD,映射到(行,1),执行reduceByKey来计算每行的数量,然后将两个结果RDD进行组合,然后进行常规聚合,如果任何迭代器不相等,则返回false。

缪成天
2023-03-14

Scala(PySpark见下文)

spark fast测试库有两种进行数据帧比较的方法(我是该库的创建者):

assertSmallDataFrameEquality方法在驱动程序节点上收集数据帧并进行比较

def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
  if (!actualDF.schema.equals(expectedDF.schema)) {
    throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
  }
  if (!actualDF.collect().sameElements(expectedDF.collect())) {
    throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
  }
}

assertLargeDataFrameEquality方法比较分布在多台机器上的数据帧(代码基本上是从spark testing base复制的)

def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
  if (!actualDF.schema.equals(expectedDF.schema)) {
    throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
  }
  try {
    actualDF.rdd.cache
    expectedDF.rdd.cache

    val actualCount = actualDF.rdd.count
    val expectedCount = expectedDF.rdd.count
    if (actualCount != expectedCount) {
      throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
    }

    val expectedIndexValue = zipWithIndex(actualDF.rdd)
    val resultIndexValue = zipWithIndex(expectedDF.rdd)

    val unequalRDD = expectedIndexValue
      .join(resultIndexValue)
      .filter {
        case (idx, (r1, r2)) =>
          !(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
      }

    val maxUnequalRowsToShow = 10
    assertEmpty(unequalRDD.take(maxUnequalRowsToShow))

  } finally {
    actualDF.rdd.unpersist()
    expectedDF.rdd.unpersist()
  }
}

assertSmallDataFrameEquality对于小数据帧比较来说更快,我发现它对于我的测试套件来说已经足够了。

派斯帕克

下面是一个简单的函数,如果数据帧相等,它将返回true:

def are_dfs_equal(df1, df2):
    if df1.schema != df2.schema:
        return False
    if df1.collect() != df2.collect():
        return False
    return True

您通常会在测试套件中执行DataFrame相等性比较,并且在比较失败时需要一条描述性错误消息(调试时,True/False返回值没有太大帮助)。

使用chispa库访问返回测试套件工作流的描述性错误消息的assert_df_equality方法。

 类似资料:
  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。 假设搜索id时返回的第一个人是正确的家长 它给出以下错误 “原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,

  • 我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。

  • 问题内容: 也许有人可以帮助我。我试图将以下ist放到pandas数据框中: 结果应如下所示: 但是我尝试做的所有事情都无法获得预期的结果。我用了这样的东西: 但是然后我松开了_source字段之外的类型。我也尝试与 但是我不知道如何使用字段_source并将其附加到原始数据帧。 有人知道如何做到这一点并达到预期的结果吗? 问题答案: 用途:

  • 我试图在火花笔记本的阿帕奇火花中做NLP。对于这个特定的例子,我正在使用库https://opennlp.apache.org创建一个块来提取名词短语。由于数据量的增加,我需要转向分布式计算。 问题是我无法广播我的chunker对象。通过阅读文档(只在board上投射数组等简单对象),我尝试了以下方法: 但这会引发以下错误: 如果我将chunker的初始化封装在函数中,然后在map方法中调用函数,

  • 给定一个包含以下格式数据的大文件(V1,V2,…,VN) 我正在尝试使用Spark获得一个类似于下面的配对列表 我尝试了针对一个较旧的问题所提到的建议,但我遇到了一些问题。例如, 我得到了错误, 有人能告诉我哪些地方我可能做得不对,或者有什么更好的方法可以达到同样的效果?非常感谢。