当前位置: 首页 > 知识库问答 >
问题:

Spark:连接2个大型DFs时,大小超过Integer.max_value

童花蜂
2023-03-14

各位,

当我试图将spark中的两个大型数据流(每个100GB+)连接到每行一个密钥标识符时,我遇到了这个问题。

我在EMR上使用Spark1.6,下面是我正在做的事情:

val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")

// clean up and filter steps later 

df1.registerTempTable("df1")
df2.registerTempTable("df2")

val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4")

df3.write.json("hdfs:///df3/")
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    null

我也尝试过使用更大的集群,但没有帮助。此链接表示,如果洗牌分区大小超过2GB,将引发此错误。但是我尝试将分区数量增加到一个很高的值,仍然没有运气。

我怀疑这可能与懒惰加载有关。当我在一个DF上做10个操作时,它们只在最后一步执行。我尝试在不同的DFs存储级别上添加.persist(),但没有成功。我还尝试删除临时表,清空所有早期的DFs以进行清理。

然而,如果我将代码分解为两个部分,代码就可以工作了--将最后的临时数据(2个数据帧)写入磁盘,退出。重新启动以只联接两个DF。

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

共有1个答案

楚博雅
2023-03-14

在spark u中,洗牌块不能大于2GB。这是因为,Spark存储将块洗牌为ByteBuffer。以下是如何分配它:

ByteBuffer.Allocate(int ;容量)

因为ByteBuffer受integer.max_size(2GB)的限制,shuffle块也是如此!!解决方案是通过在SparkSQL中使用spark.sql.shuffle.partitions,或者通过对RDD使用rdd.partition()或rdd.colease()来增加分区的数量,这样每个分区的大小都<=2GB。

 类似资料:
  • 嘿,我正在使用Glassfish开源v4,我遇到了一个奇怪的问题。 我在管理控制台中定义了到Oracle 11g的JDBC连接池,并设置了: 初始和最小池大小:500 最大游泳池大小:1000 池大小调整数量::750 我已经为这个连接池创建了一个特定的用户。然而,有时当我检查数据库中打开的连接时,我发现有1000多个连接(我看到的最大连接数是1440个) 当发生这种情况时,任何查询尝试都会失败,

  • 我最近在spark工作,遇到了一些我仍然无法解决的问题。 假设我有一个100GB的数据集,集群的ram大小是16GB。 现在,我知道在简单地读取文件并将其保存在HDFS中的情况下,Spark将为每个分区执行它。当我对100GB数据执行排序或聚合转换时会发生什么?它将如何在内存中处理100GB,因为我们需要整个数据来进行排序? 我已经通过下面的链接,但这只告诉我们火花在持久化的情况下做什么,我正在寻

  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)

  • 问题内容: 在开发过程中,本地WAMP服务器如何从测试服务器获取最新数据是对数据库进行了转储,然后使用source命令上载该转储以加载.sql文件。 最近,在导入的最后,我们遇到了有关@old变量的错误,这些变量在更改原始设置(如外键约束)之前存储了这些设置(因此请关闭外键约束,以使导入不会在以下情况下引发错误)它会重新创建表,并在尚未创建表之一时尝试创建外键。我发现原因是产品表获取越来越多的数据

  • 我有一个与我的Ng2项目的3-4天的问题。 @angular/cli:1.0。0-rc。2 节点:6.9。2 操作系统:win32 x64 @角度/普通:2.4。9 @角度/编译器:2.4。9 @角度/核心:2.4。9 @角度/形状:2.4。9 @angular/http:2.4。9 @角度/平台浏览器:2.4。9 @角度/平台浏览器动态:2.4。9 @角度/路由器:3.4。9 @angular/

  • 我的表: 插入查询: 我的页面大小是16KB。因此,我的表中的一行最多可以包含8192字节(即8KB)。 我创建了11个列(每个255个字符),其中这11列最多可以容纳字符。 如果我存储2805-3字节的字符,它将需要(