当前位置: 首页 > 知识库问答 >
问题:

“Exchange HashPartitioning”在spark中如何工作

欧浩淼
2023-03-14

目前,我使用按列重新分区和分区数将数据移动到特定分区。该列标识相应的分区(从0开始到(固定)n)。结果是Scala/Spark生成了一个意想不到的结果,并创建了更少的分区(其中一些是空的)。也许是哈希碰撞?

为了解决这个问题,我试图找出原因,试图找到变通办法。我找到了一个解决办法,将dataframe转换为rdd,并将partitionBy与HashPartitioner一起使用。令我惊讶的是:我得到了预期的结果。但是,将dataframe转换为RDD对我来说不是一个解决方案,因为它需要太多的资源。

我已经在

    scala> import org.apache.spark.HashPartitioner
    import org.apache.spark.HashPartitioner

    scala> val mydataindex = Array(0,1, 2, 3,4)
    mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

    scala> val mydata = sc.parallelize(for {
         |  x <- mydataindex
         |  y <- Array(123,456,789)
         | } yield (x, y), 100)
    mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

    scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
    rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26

    scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex{
         |                 (index, iterator) => {
         |                    val myList = iterator.toList
         |                    myList.map(x => x + " -> " + index).iterator
         |                 }
         |              }
    rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

    scala>
         | // this is expected:

    scala> rddMyDataPartitions.take(100)
    res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)

    scala> val dfMyData = mydata.toDF()
    dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

    scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
    dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

    scala> dfMyDataRepartitioned.explain(false)
    == Physical Plan ==
    Exchange hashpartitioning(_1#3, 5)
    +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
       +- Scan ExternalRDDScan[obj#2]

    scala> val dfMyDataRepartitionedPartition  = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
    dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]

    scala> // this is unexpected, because 1 partition has more indexes

    scala> dfMyDataRepartitionedPartition.show()
    +------------+-----+
    |partition_id|count|
    +------------+-----+
    |           1|    6|
    |           3|    3|
    |           4|    3|
    |           2|    3|
    +------------+-----+

有人能指导我这个“Exchange HashPartitioning”(请参阅上面的解释输出)是如何工作的吗?

2019-01-16 12:20:这不是HashPartitioner如何工作的重复?因为我对整数列上按列重新分区(+数分区)的哈希算法感兴趣。general HashPartitioner正像您在源代码中看到的那样正常工作。

共有1个答案

林鹏鹍
2023-03-14

这里没有什么出乎意料的。正如在HashPartitioner如何工作中所解释的那样?Spark采用hash(key)的模划分数和非均匀分布,特别是在小数据集上表现不明显。

预计datasetrdd之间也会有差异,因为两者使用不同的哈希函数(同上)。

最后

scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5
 类似资料:
  • 在Spark中是如何工作的? 如果我们注册一个对象作为一个表,会将所有数据保存在内存中吗?

  • 下面是一个示例火花代码,它将转换为: Scala没有方法,它来自Spark Implicits。这里的数据集是如何创建的? 编辑:我确实看过其他SO答案,但没有看到一个示例,说明如何在中使用隐式。我在评论中参考了示例答案。

  • 假设我有一个256 KB的文件存储在HDFS文件系统中的一个节点(作为两个块,每个块128 KB)。该文件内部包含两个块,每个块128 KB。假设我有两个节点集群,每个节点只有1个核心。我的理解是,转换过程中的spark将读取内存中一个节点上的完整文件,然后将一个文件块内存数据传输到另一个节点,以便两个节点/核心可以并行执行它?那是正确的吗? 如果两个节点都有两个核心,而不是一个核心呢?在这种情况

  • 所以,我有一个RDD,它有如下键值对。 在groupByKey之后,我希望得到这样的东西 然而,我发现即使在执行groupByKey()之后,也会重复相同的键。键值对的总数肯定会减少,但仍然有许多重复的键。有什么问题吗? 键的类型基本上是一个Java类,其中包含整数类型的字段。火花是否也在考虑对象字段以外的东西来识别这些对象?

  • 我想展平元组的RDD(使用无操作映射),但我得到了一个类型错误: 给予 错误:类型不匹配; 找到:(Int,String)必需:TraversableOnce[?] ap.flat地图(x= s或s的等效列表可以正常工作,例如: Scala能处理吗?如果没有,为什么没有?

  • 我对Spark很陌生,不太了解基础知识,我只是为了解决一个问题而跳入其中。该问题的解决方案包括制作一个边具有字符串属性的图(使用GraphX)。用户可能希望查询这个图,我通过只过滤那些具有字符串属性的边来处理查询,该属性等于用户的查询。 现在,我的图形有超过1600万条边;当我使用计算机的所有8个核心时,创建图形需要10分钟以上。然而,当我查询这个图时(就像我上面提到的那样),我会立即得到结果(令