当前位置: 首页 > 知识库问答 >
问题:

按键分组时Spark内存不足

阎宝
2023-03-14

我试图使用Spark主机在EC2上使用本指南对常见爬网数据执行简单转换,我的代码如下所示:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"
java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

共有1个答案

杨鸿畅
2023-03-14

shuffle任务(如groupByKey、reduceByKey等)中出现java.lang.OutofMemoryError异常的最常见原因是并行性低。

可以通过在配置中设置spark.default.parallelism属性来增加默认值。

 类似资料:
  • 我正在研究一种需要对大矩阵进行数学运算的算法。基本上,该算法包括以下步骤: 输入:大小为n的两个向量u和v > 对于两个矩阵中的每个条目,应用一个函数f。返回两个矩阵M_u,M_v 求M_的本征值和本征向量。对于i=0,返回e_i,ev_i,。。。,n-1 计算每个特征向量的外积。返回一个矩阵O_i=e_i*转置(e_i),i=0,。。。,n-1 用e_i=e_i delta_i调整每个特征值,其

  • 我有两个RDDs。在Spark scala中,如果event1001RDD和event2009RDD具有相同的id,我该如何连接它们? Val事件1001RDD:模式RDD=[事件类型,id,位置,日期1] val event 2009 rdd:schemaRDD =[事件类型,id,日期1,日期2] 预期结果将是:(唯一)(按 id 排序) [事件类型,ID,1001 的位置,1001 的日期1

  • 我已经建立了一个Spark and Flink k-means应用程序。我的测试用例是一个3节点集群上的100万个点的集群。 当内存瓶颈开始时,Flink开始外包给磁盘,工作缓慢,但工作正常。然而,如果内存已满,Spark将失去执行器,并再次启动(无限循环?)。 我尝试在邮件列表的帮助下自定义内存设置,谢谢。但是火花仍然不起作用。 是否需要设置任何配置?我是说Flink的记忆力很差,斯帕克也必须能

  • 当我将大数据保存到hdfs时,我正在体验OOME 我在Spark-Submit中使用这个: 当我增加框架时,现在的错误是:Java.lang.outofMemoryError:Java堆空间,所以我必须将驱动程序内存和执行程序内存增加到2G才能工作。如果累加Collection.value.length是500,000,我需要使用3G。这正常吗? 该文件只有146MB,包含200,000行(对于2

  • 这将产生以下结果 我所感兴趣的实际上只是上面结果中的列表,我希望理想地将其作为groupby操作的一部分来完成。我知道这是可以做到的,例如,通过循环结果映射结构。但是有没有一种方法可以使用流来实现它呢?

  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。