当前位置: 首页 > 知识库问答 >
问题:

Apache Spark在一个执行器上运行一个任务

卫嘉佑
2023-03-14

我有一个spark作业,它从数据库中读取数据,执行过滤、联合、2连接,最后将结果写回数据库。

然而,最后一个阶段仅在50个执行器中的一个执行器上运行一个任务。我试图增加分区的数量,使用哈希分区,但没有成功。

经过几个小时的谷歌搜索,似乎我的数据可能会扭曲,但我不知道如何解决它。

有什么建议吗?

规格:

    < li >独立群集 < li>120核心 < li>400G内存

遗嘱 执行 人:

  • 30个执行器(4核/执行器)
  • 每个遗嘱执行人13G
  • 4G驱动内存

代码段

    ...

  def main(args: Array[String]) {

    ....

    import sparkSession.implicits._


    val similarityDs = sparkSession.read.format("jdbc").options(opts).load
    similarityDs.createOrReplaceTempView("locator_clusters")

    val  ClassifierDs = sparkSession.sql("select *  " +
                                              "from locator_clusters where " +
                                              "relative_score >= 0.9 and " +
                                              "((content_hash_id is not NULL or content_hash_id <> '') " +
                                              "or (ref_hash_id is not NULL or ref_hash_id <> ''))").as[Hash].cache()



    def nnHash(tag: String) = (tag.hashCode & 0x7FFFFF).toLong


    val contentHashes = ClassifierDs.map(locator => (nnHash(locator.app_hash_id), Member(locator.app_hash_id,locator.app_hash_id, 0, 0, 0))).toDF("id", "member").dropDuplicates().alias("ch").as[IdMember]
    val similarHashes = ClassifierDs.map(locator => (nnHash(locator.content_hash_id), Member(locator.app_hash_id, locator.content_hash_id, 0, 0, 0))).toDF("id", "member").dropDuplicates().alias("sh").as[IdMember]


    val missingContentHashes = similarHashes.join(contentHashes, similarHashes("id") === contentHashes("id"), "right_outer").select("ch.*").toDF("id", "member").as[IdMember]

    val locatorHashesRdd = similarHashes.union(missingContentHashes).cache()

    val vertices = locatorHashesRdd.map{ case row: IdMember=> (row.id, row.member) }.cache()

    val toHashId = udf(nnHash(_:String))

    val edgesDf =  ClassifierDs.select(toHashId($"app_hash_id"), toHashId($"content_hash_id"), $"raw_score", $"relative_score").cache()

    val edges = edgesDf.map(e => Edge(e.getLong(0), e.getLong(1), (e.getDouble(2), e.getDouble(2)))).cache()


    val graph = Graph(vertices.rdd, edges.rdd).cache()

    val sc = sparkSession.sparkContext

    val ccVertices =  graph.connectedComponents.vertices.cache()


    val ccByClusters = vertices.rdd.join(ccVertices).map({
                          case (id, (hash, compId)) => (compId, hash.content_hash_id, hash.raw_score, hash.relative_score, hash.size)
                      }).toDF("id", "content_hash_id", "raw_score", "relative_score", "size").alias("cc")


    val verticesDf  = vertices.map(x => (x._1, x._2.app_hash_id, x._2.content_hash_id, x._2.raw_score, x._2.relative_score, x._2.size))
                              .toDF("id", "app_hash_id", "content_hash_id", "raw_score", "relative_score", "size").alias("v")

    val superClusters = verticesDf.join(ccByClusters, "id")
                                  .select($"v.app_hash_id", $"v.app_hash_id", $"cc.content_hash_id", $"cc.raw_score", $"cc.relative_score", $"cc.size")
                                  .toDF("ref_hash_id", "app_hash_id", "content_hash_id", "raw_score", "relative_score", "size")



    val prop = new Properties()
    prop.setProperty("user", M_DB_USER)
    prop.setProperty("password", M_DB_PASSWORD)
    prop.setProperty("driver", "org.postgresql.Driver")


    superClusters.write
                 .mode(SaveMode.Append)
                 .jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop)


    sparkSession.stop()

来自执行程序的Stderr

16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Getting 409 non-empty blocks out of 2000 blocks
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Started 59 remote fetches in 5 ms
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Getting 2000 non-empty blocks out of 2000 blocks
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Started 59 remote fetches in 9 ms
16/10/01 18:53:43 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 896.0 MB to disk (1  time so far)
16/10/01 18:53:46 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 896.0 MB to disk (2  times so far)
16/10/01 18:53:48 INFO Executor: Finished task 1906.0 in stage 769.0 (TID 260306). 3119 bytes result sent to driver
16/10/01 18:53:51 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (3  times so far)
16/10/01 18:53:57 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (4  times so far)
16/10/01 18:54:03 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (5  times so far)
16/10/01 18:54:09 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (6  times so far)
16/10/01 18:54:15 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (7  times so far)
16/10/01 18:54:21 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (8  times so far)
16/10/01 18:54:27 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (9  times so far)
16/10/01 18:54:33 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (10  times so far)
16/10/01 18:54:39 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (11  times so far)
16/10/01 18:54:44 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (12  times so far)

共有1个答案

卢毅
2023-03-14

如果数据倾斜确实是这里的问题,并且所有键都散列到一个分区,那么您可以尝试使用完全笛卡尔积或带有预过滤数据的广播连接。让我们考虑下面的例子:

val left = spark.range(1L, 100000L).select(lit(1L), rand(1)).toDF("k", "v")

left.select(countDistinct($"k")).show
// +-----------------+
// |count(DISTINCT k)|
// +-----------------+
// |                1|
// +-----------------+

任何试图用这样的数据连接的尝试都会导致严重的数据歪斜。现在假设我们可以使用另一个表,如下所示:

val right = spark.range(1L, 100000L).select(
  (rand(3) * 1000).cast("bigint"), rand(1)
).toDF("k", "v")

right.select(countDistinct($"k")).show
// +-----------------+
// |count(DISTINCT k)|
// +-----------------+
// |             1000|
// +-----------------+

如上所述,我们可以尝试两种方法:

> < li>

如果我们预计< code>right中对应于< code>key left的记录数量很少,我们可以使用广播连接:

type KeyType = Long
val keys = left.select($"k").distinct.as[KeyType].collect

val rightFiltered = broadcast(right.where($"k".isin(keys: _*)))
left.join(broadcast(rightFiltered), Seq("k"))

否则,我们可以执行<code>crossJoin</code>,然后执行<code>filter</code>:

left.as("left")
  .crossJoin(rightFiltered.as("right"))
  .where($"left.k" === $"right.k")

或者

spark.conf.set("spark.sql.crossJoin.enabled", true)

left.as("left")
  .join(rightFiltered.as("right"))
  .where($"left.k" === $"right.k")

如果存在稀有键和通用键的混合,您可以通过对稀有键执行标准连接并使用上面显示的通用方法之一来分离计算。

另一个可能的问题是< code>jdbc格式。如果不提供< code >谓词或分区列、边界和分区数量,所有数据都由一个执行器加载。

 类似资料:
  • 我正在从事一个spring boot项目,以自动化与gradle的集成测试。我最近开始在一家新企业工作,我的同事们按如下方式运行集成测试:在构建中。gradle文件有一个集成测试任务 启动任务后,应用程序开始在指定端口运行,然后打开postman,导入集合并运行测试。 我的工作是找到一种方法来跳过额外的点击,即自动运行邮递员集合。第一个想法是使用postman-run gradle插件,但由于企业

  • 问题内容: 我有以下使用类的课程。所以我想做的是,在运行cp1实例处理方法的同时,我要并行运行。 但是,我要按顺序cp1,所以我要它运行并完成,如果cp2没有完成或失败,那就很好。如果确实失败,我想加入结果。该示例中未返回任何内容,但我想返回结果。 为此,应该使用TaskExecutor吗?还是线程? 我只希望cp2与cp1并行运行。或者,如果我添加更多内容,例如说cp3,我希望它也可以与cp1并

  • 我已经创建了3个任务。Task3取决于Task1和Task2的结果。在调试代码时,它会正确执行,但在运行应用程序时,Task3会在Task1和Task2完成之前执行。 示例代码: 提前谢谢。

  • 我在gradle项目中添加了一个任务: 现在,任务总是在任务之前运行。这很好,因为构建任务包含许多步骤。现在我想显式禁用其中一个包含的任务。

  • 我有一个背景图像滚动不断,但我放置了一个菜单在它上面,需要一个按键和音乐在上面。但是,我似乎无法启动keylistener。我的假设是,这是由于for循环,它在到达它的终点之前重新设置了自己。 我希望输出在for循环中运行背景图像,当按下向下箭头时,标题屏幕会改变,但它只运行背景循环

  • 我在同一个drl文件中有两个Drools规则,如下所示: 我的想法是用第一条规则处理所有处于临界状态的事件。然后使用第二个规则,如果任何阀门有警报,这是由“如果临界”规则设置的,发送一个短信。 你知道吗?用口水可能吗?