当前位置: 首页 > 知识库问答 >
问题:

通过跨多个源的连接来激发ETL

燕和同
2023-03-14

我有一个要求,加入一组3键使用火花数据帧50余个文件。我有驱动程序表,每天有100000条记录。我使用如下数据帧将这个表与53个其他文件连接起来。

val df1 = spark.read.parquet(<driver file>)
val df2 = spark.read.parquet(<right side file1>)
.
.
val df52 = spark.read.parquet(<right side file 52>)
//join
val refinedDF1 = df1.join(df2,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
val refinedDF2 = refinedDF1.join(df3,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
 .
 .
 so on for all 50 odd files
 refinedFinalDF.write.parquet(<s3 location>)

执行失败,出现错误

容器退出,退出代码为非零52

这基本上是一个内存不足的例外。我有一个相当大的集群,包含100000条记录的数据集。我有一个EMR,有12个执行器,每个16G,驱动器内存20G。

我尝试用df手动将数据帧划分为200个分区。以循环方式重新分区(200),这根本没有帮助。在连接键中,只有key1对于所有记录是不同的,key2和key3对于所有记录都是相同的值。是否有任何优化可以使其工作?我试图保存的最后一个数据帧中有140列。如果驱动程序表有n条记录,那么在每个左外之后,我只会有n条。

更新:我已经尝试创建一个较小的数据框的驱动程序表与限制(100),我仍然得到内存溢出异常。

共有3个答案

濮阳原
2023-03-14

我有一个类似的情况,我有多个联接,最后我必须将最终数据帧写入 HDFS/Hive 表(拼花格式)。

Spark基于惰性执行机制工作,这意味着,当您的第53个数据帧被操作(保存/写入为Parquet)时,Spark会返回到所有的连接并执行它们,这会导致数据的巨大混乱,最终您的作业的容器会失败并抛出内存不足错误。

建议:您可以先将每个加入的数据帧写入HDFS,我的意思是一旦您加入了2个(可以超过2个,但保持限制)数据帧,将加入的数据帧写入HDFS/Hive并使用选择*'hive拼花表

val refinedDF1 = df1.join(df2 ,condition,'join_type')
refinedDF1.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine1")
val refinedDF1 = hc.sql("select * from dbname.refine1")

val refinedDF2 = refinedDF1.join(df3)
refinedDF2.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine2")
val refinedDF2 = hc.sql("select * from dbname.refine2")

现在你经常将连接写入 hdfs,这意味着 Spark 在调用最终连接时不必执行它们,它只需要使用你以表形式保存的 52'nd 连接输出。

通过使用这种方法,我的脚本从22小时(包括容器内存错误)下降到15到30分钟(没有内存异常/错误)。

小提示:

1) 排除join<code>键</code>为空的记录,spark与具有<code>null=null</code>条件的join相比性能不佳,因此在连接数据帧之前将其删除

2)当左侧数据帧是多行,右侧数据帧是一个查找或几行时,使用广播连接。

3)在脚本执行后,您必须清理保存在Hive/Hdfs中的中间数据帧。

陈德泽
2023-03-14

碰巧的是,我用来创建数据框的s3存储桶中的底层数据有多个文件夹,我正在过滤特定文件夹作为过滤器的一部分。例如:spark.read.parquet(s3存储桶)。过滤器('folder_name="val")。看起来火花正在将s3存储桶中的所有数据加载到执行器内存中,然后运行过滤器。这就是为什么它被轰炸了,因为相同的逻辑运行在蜂巢外部表上运行,指向s3位置的文件夹,因为分区列工作得很好。我不得不删除过滤器并读取特定文件夹来解决问题…spark.read.parquet(s3存储桶/文件夹=值)…

松鸣
2023-03-14

你们的桌子是一对一还是一对一?如果它们是一对多,那么您的联接将产生比您可能想要的更多的行。如果是这种情况,一个选项是首先对要加入的每个表执行groupBy。考虑这个例子:

val df1 = Seq(1, 2).toDF("id")
val df2 = Seq(
  (1, "a", true),
  (1, "b", false),
  (2, "c", true)
).toDF("id", "C2", "B2")

val df3 = Seq(
  (1, "x", false),
  (1, "y", true),
  (2, "z", false)
).toDF("id", "C3", "B3")

// Left outer join without accounting for 1-Many relationship.  Results in cartesian
// joining on each ID value!
df1.
  join(df2, Seq("id"), "left_outer").
  join(df3, Seq("id"), "left_outer").show()

+---+---+-----+---+-----+
| id| C2|   B2| C3|   B3|
+---+---+-----+---+-----+
|  1|  b|false|  y| true|
|  1|  b|false|  x|false|
|  1|  a| true|  y| true|
|  1|  a| true|  x|false|
|  2|  c| true|  z|false|
+---+---+-----+---+-----+

或者,如果在连接之前对行进行分组,使关系始终为1-1,则不会添加记录

val df2Grouped = df2.groupBy("id").agg(collect_list(struct($"C2", $"B2")) as "df2")
val df3Grouped = df3.groupBy("id").agg(collect_list(struct($"C3", $"B3")) as "df3")

val result = df1.
  join(df2Grouped, Seq("id"), "left_outer").
  join(df3Grouped, Seq("id"), "left_outer")
result.printSchema
result.show(10, false)

scala> result.printSchema
root
 |-- id: integer (nullable = false)
 |-- df2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C2: string (nullable = true)
 |    |    |-- B2: boolean (nullable = false)
 |-- df3: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C3: string (nullable = true)
 |    |    |-- B3: boolean (nullable = false)


scala> result.show(10, false)
+---+-----------------------+-----------------------+
|id |df2                    |df3                    |
+---+-----------------------+-----------------------+
|1  |[[a, true], [b, false]]|[[x, false], [y, true]]|
|2  |[[c, true]]            |[[z, false]]           |
+---+-----------------------+-----------------------+
 类似资料:
  • 问题内容: 我正在运行一台正在监听的服务器,公开了2个服务:和。由于这两项服务都可以从访问,因此我只想从存根拨打该地址。 服务器看起来像这样: 为什么每种服务都需要拨不同的插座? 而且由于代码基本上是重复的,以适应每种服务,所以我不能只使用an 来减少代码吗? 然后使用它为每个服务实现客户端功能,而不是为每个服务创建新功能。我已经找到了cmux,但是必须有一种无需使用外部库即可执行此操作的方法。

  • 问题内容: 我知道这很长,但是请忍受我。这个问题很容易理解,只需花一些时间就可以完全解释它。 现在我遇到这个错误 我已经阅读了文档中的所有内容,但仍然找不到解决我问题的方法。 我在私有在线源上使用$ http.get,该私有源上的数据与json文件的形式相似(因此我无法修改数据)。数据如下所示: 我正在尝试将每个项目的videoId插值到嵌入YouTube视频的HTML iframe中。在我的co

  • 问题内容: 我正在通过Node.js Express应用程序中的Mongoose与MongoDB建立单一连接: 然后,我定义架构,然后定义模型,最后定义从数据库中拉出所有用户的控制器: 这些数据库连接在应用程序启动期间打开,并且在node.js应用程序期间保持打开状态。 让我困扰的是为什么我要打开5个连接?关闭node.js应用程序后,所有5个连接都将关闭。 相关说明 :对于REST API服务器

  • 问题内容: 我正在尝试通过蓝牙两个设备连接。我已经能够做到,但是当连接启动时,操作系统要求我提供配对代码。 我要做的是以编程方式提供该代码。有没有一种方法可以连接这些设备并发送配对代码而无需用户插入? 注意:我确实有配对代码,只是我不希望用户插入它,而是由应用程序从保存它的地方获取并使用它。 注意_2:必须使用配对码。因此,不能使用createInsecureRfcommSocketToServi

  • 问题内容: 我需要一个有关如何通过wifi在android上处理数据的教程。Fe,我需要向PC发送一些消息并接收其他消息。我应该如何建立连接?我应该怎么做才能传输数据?如何传输? 对不起,这个假人的问题,但是我找不到好的手册。我将不胜感激,因为它提供了尽可能详尽的手册,或者提供了一些有关建立wifi连接和发送/接收简单消息的简单示例。 问题答案: 您无需使用任何API即可连接到wifi,仅当您想在

  • 我正在尝试设置 rabbitmq 它可以通过 nginx 从外部(从非本地主机)访问。 nginx-rabbitmq.conf: rabbitmq.conf文件: 默认情况下,来宾用户只能从本地主机进行交互,因此我们需要创建另一个具有所需权限的用户,如下所示: 但是,当我尝试通过pika连接到Rabbitmq时,我得到了ConntionClo的异常 --[引发连接关闭异常]-- 如果我使用相同的参