当前位置: 首页 > 知识库问答 >
问题:

在HDFS上预共分组表和在Spark中用零洗牌读取表

司空昊阳
2023-03-14

作为spark作业的一部分,我有两个表要加入/合并,这在每次运行作业时都会引起很大的洗牌。我想通过存储一次共分组数据来摊销所有工作的成本,并将已经共分组的数据作为定期Spark运行的一部分,以避免洗牌。

为了实现这一点,我将一些HDFS中的数据存储在拼花板格式中。我正在使用Parquet重复字段来实现以下模式

(日期,[aRecords],[bRecords])

在这种情况下,aRecords和bRecords似乎可以有效地按日期合并。我可以执行如下操作:

case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
    .flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))

两者之间的区别在于,我避免了对已经合并的数据进行洗牌,合并的成本通过在HDFS上持久化来摊销。

现在回答问题。从cogrouped dataset中,我希望派生两个分组数据集,这样我就可以使用标准的Spark SQL运算符(如cogroup、join等)而不会导致洗牌。这似乎是可能的,因为第一个代码示例工作,但Spark仍然坚持在我加入/groupbykey/cogroup等时对数据进行散列/洗牌。

以下面的代码示例为例。我希望有一种方法,我们可以在执行联接时运行下面的内容而不会引起洗牌。

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

val aRecords = cogroupedData
    .flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
    .flatMap(cog => cog.bRecords.map(b => (cog.date,b)))

val joined = aRecords.join(bRecords,Seq("date"))

查看文献,如果cogroupedData有一个已知的分区器,那么接下来的操作不应该引起洗牌,因为它们可以使用RDD已经分区的事实并保留分区器。

我认为要实现这一点,需要使用已知的分区器获得一个cogroupedData DataSet/RDD,而不会导致洗牌。

    null

有人有什么想法吗?

共有1个答案

汪思博
2023-03-14

你在这里犯了两个错误。

>

  • 今天(Spark2.3),Spark除了分区修剪之外,不使用分区信息进行查询优化。只使用扣饰。有关详细信息,请参见Spark是否知道DataFrame的分区键?。

    结论:要有任何机会优化,你必须使用转移瘤和扣环。

    一般来说,Spark不能优化“强类型”数据集上的操作。有关详细信息,请参见Spark2.0Dataset vs DataFrame,以及为什么谓词下推不用于类型化Dataset API(vs非类型化DataFrame API)?

    正确的做法是:

    >

  • 用扣。

    val n: Int
    someDF.write.bucketBy(n, "date").saveAsTable("df")
    

    删除函数式API,转而使用SQL API:

    import org.apache.spark.sql.functions.explode
    
    val df = spark.table("df")
    
    val adf = df.select($"date", explode($"aRecords").alias("aRecords"))
    val bdf = df.select($"date", explode($"bRecords").alias("bRecords"))
    
    adf.join(bdf, Seq("date"))
    

  •  类似资料:
    • 假设我有一个256 KB的文件存储在HDFS文件系统中的一个节点(作为两个块,每个块128 KB)。该文件内部包含两个块,每个块128 KB。假设我有两个节点集群,每个节点只有1个核心。我的理解是,转换过程中的spark将读取内存中一个节点上的完整文件,然后将一个文件块内存数据传输到另一个节点,以便两个节点/核心可以并行执行它?那是正确的吗? 如果两个节点都有两个核心,而不是一个核心呢?在这种情况

    • 我有一个大的(>500M行)CSV文件。这个CSV文件中的每一行都包含一个位于HDFS上的二进制文件的路径。我想使用Spark读取这些文件中的每一个,处理它们,并将结果写到另一个CSV文件或表中。 在驱动程序中执行此操作非常简单,下面的代码完成了这项工作 但是失败 是否可以让执行者直接访问HDFS文件或HDFS文件系统?或者,是否有一种有效的方法来读取HDFS/S3上数百万个二进制文件并用Spar

    • 我在AWS上有一个Hadoop/Yarn集群设置,我有一个主服务器和三个从服务器。我已经验证有3个活动节点在端口50070和8088上运行。我在客户机部署模式下测试了一个spark工作,一切都很好。 当我尝试使用。我得到以下错误。

    • 我在多个论坛上读到,当您的基础表被存储桶和排序时,在执行排序合并联接时,随机播放会减少。但是,我的问题如下 排序的存储桶只会保证存储桶中的数据大约是相同的一组键,并且数据被排序。假设我们有2个数据帧d1和d2,两者都被排序和扣分。 spark是否保证包含key1和key2数据的d1表的bucketx与包含key1与key2的d2表的bucky在同一台机器上 如果保证 bucketx 和 bucke

    • 我使用Spark SQL v2。4.7关于EMR(含纱线)。我编写Spark Sql查询来执行转换。 估计复杂查询的最佳随机分区数: 我正在尝试估计需要设置的最佳随机分区数,以便为具有多个连接的复杂查询获得最佳性能。在Internet上我发现分区的最佳大小应该在-的范围内。现在,由于我知道这个值,我的下一步是计算查询的数据随机体积(以MB为单位),然后将其除以以获得随机分区数。但是,对于涉及多个与