当前位置: 首页 > 知识库问答 >
问题:

如何并行化Spark scala计算?

唐兴思
2023-03-14

当我使用spark API运行类似的代码时,它在许多不同的(分布式)作业中运行,并且成功运行。当我运行它时,我的代码(应该做与Spark代码相同的事情),我得到一个堆栈溢出错误。知道为什么吗?

代码如下:

import java.util.Arrays
        import org.apache.spark.mllib.linalg.{Vectors, Vector}
        import org.apache.spark.mllib.linalg._
        import org.apache.spark.mllib.linalg.distributed.RowMatrix
        import org.apache.spark.rdd.RDD
        import org.apache.spark.api.java.JavaRDD
        import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV,
          MatrixSingularException, SparseVector => BSV, CSCMatrix => BSM, Matrix => BM}

        val EPSILON = {
            var eps = 1.0
            while ((1.0 + (eps / 2.0)) != 1.0) {
              eps /= 2.0
            }
            eps
          }

        def dot(x: Vector, y: Vector): Double = {
            require(x.size == y.size,
              "BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
              " x.size = " + x.size + ", y.size = " + y.size)
            (x, y) match {
              case (dx: DenseVector, dy: DenseVector) =>
                dot(dx, dy)
              case (sx: SparseVector, dy: DenseVector) =>
                dot(sx, dy)
              case (dx: DenseVector, sy: SparseVector) =>
                dot(sy, dx)
              case (sx: SparseVector, sy: SparseVector) =>
                dot(sx, sy)
              case _ =>
                throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
            }
         }

         def fastSquaredDistance(
              v1: Vector,
              norm1: Double,
              v2: Vector,
              norm2: Double,
              precision: Double = 1e-6): Double = {
            val n = v1.size
            require(v2.size == n)
            require(norm1 >= 0.0 && norm2 >= 0.0)
            val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
            val normDiff = norm1 - norm2
            var sqDist = 0.0
            /*
             * The relative error is
             * <pre>
             * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
             * </pre>
             * which is bounded by
             * <pre>
             * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
             * </pre>
             * The bound doesn't need the inner product, so we can use it as a sufficient condition to
             * check quickly whether the inner product approach is accurate.
             */
            val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
            if (precisionBound1 < precision) {
              sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
            } else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
              val dotValue = dot(v1, v2)
              sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
              val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
                (sqDist + EPSILON)
              if (precisionBound2 > precision) {
                sqDist = Vectors.sqdist(v1, v2)
              }
            } else {
              sqDist = Vectors.sqdist(v1, v2)
            }
            sqDist
        }

        def findClosest(
              centers: TraversableOnce[Vector],
              point: Vector): (Int, Double) = {
            var bestDistance = Double.PositiveInfinity
            var bestIndex = 0
            var i = 0
            centers.foreach { center =>
              // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
              // distance computation.
              var lowerBoundOfSqDist = Vectors.norm(center, 2.0) - Vectors.norm(point, 2.0)
              lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
              if (lowerBoundOfSqDist < bestDistance) {
                val distance: Double = fastSquaredDistance(center, Vectors.norm(center, 2.0), point, Vectors.norm(point, 2.0))
                if (distance < bestDistance) {
                  bestDistance = distance
                  bestIndex = i
                }
              }
              i += 1
            }
            (bestIndex, bestDistance)
        }

         def pointCost(
              centers: TraversableOnce[Vector],
              point: Vector): Double =
            findClosest(centers, point)._2



        def clusterCentersIter: Iterable[Vector] =
            clusterCenters.map(p => p)


        def computeCostZep(indata: RDD[Vector]): Double = {
            val bcCenters = indata.context.broadcast(clusterCenters)
            indata.map(p => pointCost(bcCenters.value, p)).sum()
          }

        computeCostZep(projectedData)

我相信我正在使用与spark相同的所有并行化工作,但它对我不起作用。任何关于使我的代码分发/帮助了解为什么在我的代码中发生内存溢出的建议都将是非常有帮助的

val clusters = KMeans.train(projectedData, numClusters, numIterations)

val clusterCenters = clusters.clusterCenters




// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(projectedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

共有1个答案

裴宜春
2023-03-14

正在发生的事情似乎很简单:您在这里递归调用dot方法:

def dot(x: Vector, y: Vector): Double = {
        require(x.size == y.size,
          "BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
          " x.size = " + x.size + ", y.size = " + y.size)
        (x, y) match {
          case (dx: DenseVector, dy: DenseVector) =>
            dot(dx, dy)
          case (sx: SparseVector, dy: DenseVector) =>
            dot(sx, dy)
          case (dx: DenseVector, sy: SparseVector) =>
            dot(sy, dx)
          case (sx: SparseVector, sy: SparseVector) =>
            dot(sx, sy)
          case _ =>
            throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
        }
     }

dot的后续递归调用与前一个调用具有相同的参数-因此递归永远不会有结论。

stacktrace还告诉您--注意位置在dot方法处:

 类似资料:
  • 问题内容: 列表理解和映射计算都应该(至少在理论上)相对容易并行化:列表理解内的每个计算都可以独立于所有其他元素的计算来完成。例如在表达式中 每个x * x计算都可以(至少在理论上)并行完成。 我的问题是:是否有任何Python模块/ Python实施/ Python编程技巧可并行化列表理解计算(以便使用所有16​​/32 / …内核或将计算分布在计算机网格或在云上)? 问题答案: 正如Ken所说

  • 我已经用RxJava成功地完成了一个小型Java程序。代码为: 使用此代码,一切正常。现在我正在尝试将此代码传递给Android: 在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。 我在使用map(I)的线路上遇到错误- 内置。gradle(用于应用程序)我正在使用: 我如何解决这个问题?

  • 问题内容: 我可以一次下载一个文件: 我可以这样尝试: 有没有不使用或作弊的并行化方法? 鉴于我现在必须诉诸“作弊”,是否是下载数据的正确方法? 使用上述方法时,它使用的是多线程而不是多核的,是否正常?有没有办法使它成为多核而不是多线程? 问题答案: 您可以使用线程池并行下载文件: 您还可以使用以下命令在一个线程中一次下载多个文件: 这里定义在哪里。

  • 我是spark新手,有一个简单的spark应用程序,使用spark SQL/hiveContext: 从hive表中选择数据(10亿行) 做一些过滤,聚合,包括row_number窗口函数来选择第一行,分组,计数()和最大()等。 将结果写入HBase(数亿行) 我提交的作业运行它在纱线集群(100个执行者),它很慢,当我在火花UI中查看DAG可视化时,似乎只有蜂巢表扫描任务并行运行,其余的步骤#

  • 是否有人有任何使用TBB有效并行std::分区的技巧?这已经完成了吗? 以下是我的想法: 如果数组很小,std::将其分区(串行)并返回 否则,使用自定义迭代器将数组视为2个交错数组(在缓存大小的块中交错) 为每对迭代器启动一个并行分区任务(递归到步骤1) 在两个分区/中间指针之间交换元素* 返回合并的分区/中间指针 *我希望在一般情况下,与数组的长度相比,或者与将数组划分为连续块时所需的交换相比

  • 9.3 并行计算* 计算思维是建立在计算机的能力和限制之上的,计算机科学家的任务是尽量发扬计算机 的能力,避开计算机的限制。传统的计算概念是在计算机发明之初形成的,就是由一个处理 器按顺序执行一个程序的所有指令。并行计算则突破了这种限制,试图让多个处理器同时做 事情。并行计算的好处是显然的,想想一个人吃一锅饭与一百个人同时吃一锅饭的差别,就 能理解并行计算的威力。 可以在不同层次上讨论并行。最底层