当前位置: 首页 > 知识库问答 >
问题:

Apache Spark-foreach Vs foreachPartitions何时使用什么?

傅毅然
2023-03-14

我想知道,与foreach方法相比,foreach是否会由于更高级别的并行性而导致更好的性能,考虑到我正在通过RDD对累加器变量执行一些求和。

共有1个答案

苏伟志
2023-03-14

foreachforeachpartitions是操作。

用于调用具有副作用的操作的泛型函数。对于RDD中的每个元素,它调用传递的函数。这通常用于操作累加器或写入外部存储。

注意:在foreach()之外修改累加器以外的变量可能会导致未定义的行为。有关更多详细信息,请参见了解闭包。

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10
    null
/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }
  • 示例2:

foreachpartition与SparkStreams(dstreams)和kafka producer一起使用

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

注意:如果您想避免这种每个分区创建一次生产者的方式,最好的方法是使用sparkcontext.broadcast广播生产者,因为Kafka生产者是异步的,并且在发送之前会大量缓冲数据。

累加器示例片段可以使用它...您可以通过它来测试性能

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

foreachpartition对分区的操作,因此显然,edge比foreach更好

当您访问昂贵的资源(如数据库连接或kafka producer等)时,应使用foreachpartition。它将每个分区初始化一个而不是每个元素初始化一个(foreach)。当涉及到蓄能器时,您可以通过上述测试方法来测量性能,这些测试方法在蓄能器的情况下也应该工作得更快。

还有...参见map vs mappartitions,它们有相似的概念,但它们是转换。

 类似资料:
  • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我正在尝试创建一个spark应用程序,它对创建、读取、写入和更新MySQL数据非常有用。那么,有没有办法使用Spark创建一个MySQL表? 下面是在MySQL数据库中创建表的Scala JDBC代码。我怎样才能通过Spark做到这一点?

  • 问题内容: 什么是a ,何时应该使用它?a 和a有什么区别? 问题答案: 如果没有其他对键对象的强引用,则弱哈希图中的元素可以由垃圾回收器回收,这使它们对于缓存/查找存储很有用。 弱引用不限于这些哈希表,您可以对单个对象使用WeakReference。它们对于节省资源很有用,您可以保留对某些内容的引用,但在没有其他引用的情况下允许对其进行收集。(顺便说一句,强引用是普通的Java引用)。还有一些弱

  • 问题内容: 作为React世界的初学者,我想深入了解我使用时会发生什么以及使用该情况的情况。以下代码段的意义是什么? 问题答案: “孩子”到底是什么? React文档说,您可以在代表“通用框”并且不提前知道其子级的组件上使用。对我来说,这并没有真正清除一切。我可以肯定的是,这个定义很合理,但对我而言却不是。 我对操作的简单解释是, 它用于在调用组件时显示在开始和结束标记之间包含的所有内容。 一个简

  • 问题内容: 奇怪的是: 似乎或多或少被定义为。通过这种方式很容易产生错误: 一些fname意外地以else块结尾。修复很简单,我们应该改用它,但是从表面上看,这似乎是一种不错的pythonic方式,并且比“正确”的方式更具可读性。 由于字符串是不可变的,所以为什么字符串错误是什么技术细节?什么时候进行身份检查更好,什么时候进行平等检查更好? 问题答案: 据我所知,检查对象身份是否相等。由于没有强制

  • 问题内容: 我有一个将客户发送到另一个站点来处理付款的应用程序。客户之外的另一个站点在我们的服务器上调用一个页面,让我们知道付款的状态。被调用页面会检查付款应用程序提供的参数,并检查我们是否知道该交易。然后,它更新数据库以反映状态。这一切都无需与客户进行任何互动即可完成。 我个人选择将此功能实现为JSP,因为将文件拖放到文件系统中比编译和打包文件然后将条目添加到配置文件中要容易得多。 考虑到页面的

  • 我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。 假设搜索id时返回的第一个人是正确的家长 它给出以下错误 “原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,