当前位置: 首页 > 知识库问答 >
问题:

火花:如何使用映射分区和创建/关闭每个分区的连接

司徒良哲
2023-03-14

所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()

这给我一个错误,因为map分区期望返回Iterator[NotInferedR]的类型,但这里是Unit。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办?

谢谢

共有2个答案

呼延靖
2023-03-14

匿名函数实现中的最后一个表达式必须是返回值:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    // using toList to force eager computation - make it happen now when connection is open
    val result = iterator.map(/* the same... */).toList
    conn.writeTheBatchToDB()
    conn.close()
    result.iterator
  }
).toDF()
危阳
2023-03-14

在大多数情况下,如果不减慢作业速度,急于消耗迭代器将导致执行失败。因此,我所做的是检查迭代器是否已经为空,然后执行清理例程。

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})

起初认为这是一个火花问题,但实际上是一个scala问题。http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean

 类似资料:
  • 我如何使用胶水/火花转换成拼花,这也是分区的日期和分裂在n个文件每天?。这些示例不包括分区、拆分或供应(多少节点和多大节点)。每天包含几百GBS。 因为源CSV不一定在正确的分区中(错误的日期),并且大小不一致,所以我希望用正确的分区和更一致的大小写到分区的parquet。

  • 我们开始在团队中尝试spark。在我们减少spark中的工作后,我们希望将结果写入S3,但是我们希望避免收集Spark结果。目前,我们正在为RDD的每个分区写文件,但是这会产生很多小文件。我们希望能够将数据聚合到几个文件中,这些文件按照写入文件的对象数量进行分区。例如,我们的总数据是100万个对象(这是不变的),我们希望生成40万个对象文件,而我们当前的分区生成大约2万个对象文件(这因每个作业而异

  • 如何根据列中项数的计数来分区DataFrame。假设我们有一个包含100人的DataFrame(列是和),我们希望为一个国家中的每10个人创建一个分区。 如果我们的数据集包含来自中国的80人,来自法国的15人,来自古巴的5人,那么我们需要8个分区用于中国,2个分区用于法国,1个分区用于古巴。 下面是无法工作的代码: null 有什么方法可以动态设置每个列的分区数吗?这将使创建分区数据集变得更加容易

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 我有5个表存储为CSV文件(A.CSV、B.CSV、C.CSV、D.CSV、E.CSV)。每个文件按日期分区。如果文件夹结构如下:

  • 谁能给我解释一下吗? 然而,另一方面是,对于不能保证产生已知分区的转换,输出RDD将没有分区器集。例如,如果对哈希分区的键/值对RDD调用map(),则传递给map()的函数在理论上可以更改每个元素的键,因此结果将不会有分区器。Spark不会分析函数以检查它们是否保留密钥。相反,它提供了另外两个操作,mapValues()和flatMap Values(),它们保证每个元组的键保持不变。 Mate