当前位置: 首页 > 知识库问答 >
问题:

Spark:当我保存到HDFS时内存不足错误

赵雪峰
2023-03-14

当我将大数据保存到hdfs时,我正在体验OOME

val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
    if (row.endsWith(",")) {
        accumulableCollection += row
        false
    } else if (row.length < 100) {
        accumulableCollection += row
        false
    }
    valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
    var valid = true
    for((k,v) <- fieldsMap if valid ) {
        if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
            accumulableCollection += row.mkString(",")
            valid = false
        }
    }
    valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)

我在Spark-Submit中使用这个:

--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2
15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

当我增加框架时,现在的错误是:Java.lang.outofMemoryError:Java堆空间,所以我必须将驱动程序内存和执行程序内存增加到2G才能工作。如果累加Collection.value.length是500,000,我需要使用3G。这正常吗?

该文件只有146MB,包含200,000行(对于2G内存)。(在HDFS中,它被分成两个分区,每个分区包含73MB)

共有1个答案

斜成济
2023-03-14

意思差不多就是上面说的。您正试图序列化一个非常大的单个对象。您可能应该重写代码以避免这样做。

例如,我不清楚为什么要尝试更新可累加集合,并且在过滤器中这样做,该过滤器甚至可以执行多次。然后缓存RDD,但您已经尝试在驱动程序上有一个副本?然后将其他值添加到本地集合中,然后再次将其转换为RDD?

为什么要累积收藏?只需在RDDS上操作。这里有很多冗余。

 类似资料:
  • 我正在尝试将我的模型保存为从spark ml库创建的对象。 但是,它给了我一个错误: 以下是我的依赖项: 我还想将从模型生成的dataframe保存为CSV。

  • 我已经建立了一个Spark and Flink k-means应用程序。我的测试用例是一个3节点集群上的100万个点的集群。 当内存瓶颈开始时,Flink开始外包给磁盘,工作缓慢,但工作正常。然而,如果内存已满,Spark将失去执行器,并再次启动(无限循环?)。 我尝试在邮件列表的帮助下自定义内存设置,谢谢。但是火花仍然不起作用。 是否需要设置任何配置?我是说Flink的记忆力很差,斯帕克也必须能

  • 我试图使用Spark主机在EC2上使用本指南对常见爬网数据执行简单转换,我的代码如下所示:

  • 问题内容: 我是Netbeans中这种错误的新手。我一直在使用Java Bean 8.0.2在Java J2SE中工作。我正在对字符串进行模糊搜索,通常字符串长度为300-500。我正在使用Levenshtein和Jaro Winkler算法来查找字符串之间的距离。大约有1500次迭代来查找字符串之间的距离!问题是我的Net Bean通常会为以下内容提供错误: 我已经在线进行了一些搜索来摆脱此错误

  • 上周,我们在生产环境中遇到了内存不足的错误。这种内存不足的错误可能每周发生一次,当前的解决方案是重新启动应用程序服务器。我们使用的是glassfish 3.0.1。生成的堆转储约为5GB。 请帮助分析下面的堆转储。下面是使用eclipse MAT生成的泄漏嫌疑人报告。我们如何分析下面的报告?

  • null 当我运行上面的代码,然后该表以激发内存时,它占用的内存<2GB-与集群可用的内存相比很小-然后当我试图数据到驱动程序节点时,我会得到一个OOM错误。 我已尝试在以下设置上运行: 具有32个内核和244GB RAM的计算机上的本地模式 具有10 x 6.2GB执行程序和61GB驱动程序节点的独立模式 我的问题: 缓存后占用空间如此之少的数据文件怎么会导致内存问题? 在我转向可能损害性能的其