我已经在spark用户论坛上发布了这个问题,但没有收到回复,所以再次在这里提问。
我们有一个用例,我们需要做笛卡尔连接,由于某种原因,我们无法让它与数据集 API 一起工作。
我们有两个数据集:
>
我们广播了这个数据集
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2") ds1.count val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts") ds2.count ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
如果我使用RDD api实现它,我在ds1中广播数据,然后在ds2中过滤数据,它工作得很好。
我已确认广播成功。
2019-02-14 23:11:55 信息 代码生成器:54 - 代码生成于 10.469136 毫秒 2019-02-14 23:11:55 信息 洪流广播:54 - 开始读取广播变量 29 2019-02-14 23:11:55 信息 洪流广播:54 - 读取广播变量 29 需要 6 毫秒 2019-02-14 23:11:56 信息 代码生成器:54 - 在 11.280087 毫秒内生成的代码
查询计划:
= = Physical Plan = = < br > BroadcastNestedLoopJoin BuildRight,Cross,((c1#68
那么这个阶段就不会进步。
我更新了代码以使用广播ds1,然后在ds2的mapPartitions中加入。
val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect val rangesBC = sc.broadcast(ranges)
然后在 mapPartitions 方法中使用此 rangesBC 来标识 ds2 中每一行所属的范围,此作业在 3 小时内完成,而另一个作业即使在 24 小时后仍未完成。这意味着查询优化器没有做我希望它做的事情。
我做错了什么?任何指针都会有帮助。非常感谢。
我不知道您是使用裸机还是使用带spot、按需或专用的AWS,或者是带AZURE的虚拟机,等等。我的看法是:
然后:
所以,为了回答你的问题:-你没有做错什么。
我最近遇到了这个问题,发现 Spark 在交叉连接大型数据帧时具有奇怪的分区行为。如果输入数据帧包含几百万条记录,则交叉联接数据帧的分区等于输入数据帧分区的乘法,即
crossJoinDF的分区ds1的分区)ds2的分区)。
如果 ds1 或 ds2 包含大约几百个分区,则交叉联接数据帧将具有 ~ 10,000 范围内的分区。这些分区太多了,这会导致管理许多小任务的开销过大,使得交叉连接的数据帧上的任何计算(在您的情况下 - 过滤器)的运行速度非常慢。
那么,如何使计算速度更快呢?首先检查这是否确实是您的问题所在:
scala> val crossJoinDF = ds2.crossJoin(ds1)
# This should return immediately because of spark lazy evaluation
scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size
检查交叉连接数据框上的分区数。如果交叉JoinDF分区
要加快交叉连接数据帧的操作速度,请减少输入数据帧的分区数量。例如:
scala> val ds1 = ds1.repartition(40)
scala> ds1.rdd.partitions.size
res80: Int = 40
scala> val ds2 = ds2.repartition(40)
scala> ds2.rdd.partitions.size
res81: Int = 40
scala> val crossJoinDF = ds1.crossJoin(ds2)
scala> crossJoinDF.rdd.partitions.size
res82: Int = 1600
scala> crossJoinDF.count()
count()
操作应导致执行交叉连接。计数现在应在合理的时间内返回。您选择的确切分区数取决于群集中可用的核心数。
这里的关键是确保您的交叉连接数据帧具有合理数量的分区(
我正在尝试创建一个spark应用程序,它对创建、读取、写入和更新MySQL数据非常有用。那么,有没有办法使用Spark创建一个MySQL表? 下面是在MySQL数据库中创建表的Scala JDBC代码。我怎样才能通过Spark做到这一点?
我有一个数据集是从 记录此数据集的min和max。让[min, max]作为根节点。然后它有100个点。让[min, min d)和[max-d, max](其中d=(max-min)/2)分别是左右子节点。继续这样做。当节点小于或等于5个点(在数据集中)时停止。如何找到每个节点的点数? 我已经用节点构造了原始树(1,2,3......从上到下,从左到右),现在我想插入每个节点的数据。我还编写了拆
问题内容: 我正在尝试训练数据不平衡的网络。我有A(198个样本),B(436个样本),C(710个样本),D(272个样本),并且我已经阅读了有关“weighted_cross_entropy_with_logits”的信息,但是我发现的所有示例都是针对二进制分类的,因此我不太了解对如何设置这些权重充满信心。 样本总数:1616 A_weight:198/1616 = 0.12? 如果我理解的话
问题内容: 我正在使用MySQL。这是我的桌子 我需要一个选择查询来显示类似这样的表。 问题答案: 您可以使用此查询- 它产生您想要的结果。但是,如果您想动态地进行操作,请参阅这篇文章“自动执行数据透视表查询”-http: //www.artfulsoftware.com/infotree/queries.php#523,或者该链接- 动态数据透视表。
我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus
我正在尝试读取包含700K条记录的Excel文件,并将这些记录批量插入MySQL数据库表中。 请注意,Excel解析速度很快,我可以在50秒左右的时间内将实体对象放入中。 我使用Spring Boot和Spring数据JPA。 下面是我的部分文件: 以及我的部分: 以下是我的 : 下面是类: 有人能告诉我我在这里做了什么不正确的事情吗? 编辑: 进程未完成并最终抛出错误:- 谢谢