当前位置: 首页 > 知识库问答 >
问题:

使用数据集在Apache Spark中交叉加入非常慢

晏富
2023-03-14

我已经在spark用户论坛上发布了这个问题,但没有收到回复,所以再次在这里提问。

我们有一个用例,我们需要做笛卡尔连接,由于某种原因,我们无法让它与数据集 API 一起工作。

我们有两个数据集:

>

  • 一个包含 2 个字符串列的数据集表示 C1、C2。它是一个包含 ~100 万条记录的小数据集。这两列都是 32 个字符的字符串,因此应小于 500 mb。

    我们广播了这个数据集

    val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
    ds1.count
    val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
    ds2.count
    ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
    

    如果我使用RDD api实现它,我在ds1中广播数据,然后在ds2中过滤数据,它工作得很好。

    我已确认广播成功。

    2019-02-14 23:11:55 信息 代码生成器:54 - 代码生成于 10.469136 毫秒 2019-02-14 23:11:55 信息 洪流广播:54 - 开始读取广播变量 29 2019-02-14 23:11:55 信息 洪流广播:54 - 读取广播变量 29 需要 6 毫秒 2019-02-14 23:11:56 信息 代码生成器:54 - 在 11.280087 毫秒内生成的代码

    查询计划:

    = = Physical Plan = = < br > BroadcastNestedLoopJoin BuildRight,Cross,((c1#68

    那么这个阶段就不会进步。

    我更新了代码以使用广播ds1,然后在ds2的mapPartitions中加入。

    val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
    val rangesBC = sc.broadcast(ranges)
    

    然后在 mapPartitions 方法中使用此 rangesBC 来标识 ds2 中每一行所属的范围,此作业在 3 小时内完成,而另一个作业即使在 24 小时后仍未完成。这意味着查询优化器没有做我希望它做的事情。

    我做错了什么?任何指针都会有帮助。非常感谢。

  • 共有2个答案

    干京
    2023-03-14

    我不知道您是使用裸机还是使用带spot、按需或专用的AWS,或者是带AZURE的虚拟机,等等。我的看法是:

      < li >认识到10米x 1米的工作量很大,即使。过滤器应用于结果交叉连接。这需要一些时间。你的期望是什么? < li>Spark通常都是以线性方式扩展的。 < li >具有虚拟机的数据中心没有专用的,因此没有最快的性能。

    然后:

    • 我在数据库10Mx100K上运行了一个模拟设置,核心为0.86,社区版驱动程序为6GB。这运行了17分钟。
    • 我在4节点AWS EMR非专用集群上运行了您示例中的10Mx1M(有一些EMR奇怪之处,例如在有价值的实例上保留驱动程序!)花了3个小时才部分完成。

    所以,为了回答你的问题:-你没有做错什么。

    • 只是需要更多的资源,允许更多的并行化。
    • 如您所见,我确实添加了一些显式分区。
    祁正浩
    2023-03-14

    我最近遇到了这个问题,发现 Spark 在交叉连接大型数据帧时具有奇怪的分区行为。如果输入数据帧包含几百万条记录,则交叉联接数据帧的分区等于输入数据帧分区的乘法,即

    crossJoinDF的分区ds1的分区)ds2的分区)。

    如果 ds1 或 ds2 包含大约几百个分区,则交叉联接数据帧将具有 ~ 10,000 范围内的分区。这些分区太多了,这会导致管理许多小任务的开销过大,使得交叉连接的数据帧上的任何计算(在您的情况下 - 过滤器)的运行速度非常慢。

    那么,如何使计算速度更快呢?首先检查这是否确实是您的问题所在:

    scala> val crossJoinDF = ds2.crossJoin(ds1)
    # This should return immediately because of spark lazy evaluation
    
    scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size
    

    检查交叉连接数据框上的分区数。如果交叉JoinDF分区

    要加快交叉连接数据帧的操作速度,请减少输入数据帧的分区数量。例如:

    scala> val ds1 = ds1.repartition(40)
    scala> ds1.rdd.partitions.size 
    res80: Int = 40
    
    scala> val ds2 = ds2.repartition(40)
    scala> ds2.rdd.partitions.size 
    res81: Int = 40
    
    scala> val crossJoinDF = ds1.crossJoin(ds2)
    scala> crossJoinDF.rdd.partitions.size 
    res82: Int = 1600
    
    scala> crossJoinDF.count()
    

    count() 操作应导致执行交叉连接。计数现在应在合理的时间内返回。您选择的确切分区数取决于群集中可用的核心数。

    这里的关键是确保您的交叉连接数据帧具有合理数量的分区(

     类似资料:
    • 我正在尝试创建一个spark应用程序,它对创建、读取、写入和更新MySQL数据非常有用。那么,有没有办法使用Spark创建一个MySQL表? 下面是在MySQL数据库中创建表的Scala JDBC代码。我怎样才能通过Spark做到这一点?

    • 我有一个数据集是从 记录此数据集的min和max。让[min, max]作为根节点。然后它有100个点。让[min, min d)和[max-d, max](其中d=(max-min)/2)分别是左右子节点。继续这样做。当节点小于或等于5个点(在数据集中)时停止。如何找到每个节点的点数? 我已经用节点构造了原始树(1,2,3......从上到下,从左到右),现在我想插入每个节点的数据。我还编写了拆

    • 问题内容: 我正在尝试训练数据不平衡的网络。我有A(198个样本),B(436个样本),C(710个样本),D(272个样本),并且我已经阅读了有关“weighted_cross_entropy_with_logits”的信息,但是我发现的所有示例都是针对二进制分类的,因此我不太了解对如何设置这些权重充满信心。 样本总数:1616 A_weight:198/1616 = 0.12? 如果我理解的话

    • 问题内容: 我正在使用MySQL。这是我的桌子 我需要一个选择查询来显示类似这样的表。 问题答案: 您可以使用此查询- 它产生您想要的结果。但是,如果您想动态地进行操作,请参阅这篇文章“自动执行数据透视表查询”-http: //www.artfulsoftware.com/infotree/queries.php#523,或者该链接- 动态数据透视表。

    • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus

    • 我正在尝试读取包含700K条记录的Excel文件,并将这些记录批量插入MySQL数据库表中。 请注意,Excel解析速度很快,我可以在50秒左右的时间内将实体对象放入中。 我使用Spring Boot和Spring数据JPA。 下面是我的部分文件: 以及我的部分: 以下是我的 : 下面是类: 有人能告诉我我在这里做了什么不正确的事情吗? 编辑: 进程未完成并最终抛出错误:- 谢谢