当前位置: 首页 > 知识库问答 >
问题:

Spark:每个Spark RDD分区和do mapPartition的DB连接

慎旭尧
2023-03-14

我想在我的spark rdd上做一个映射,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })

但是,这给了我一个已经关闭的连接异常,正如预期的那样,因为在控件到达. map()之前,我的连接是关闭的。我想为每个RDD分区创建一个连接,并正确地关闭它。我如何实现这一点?

谢谢

共有2个答案

水昊阳
2023-03-14
rdd.foreachPartitionAsync(iterator->{

// this object will be cached inside each executor JVM. For the first time, the //connection will be created and hence forward, it will be reused. 
// Very useful for streaming apps
DBConn conn=DBConn.getConnection()
while(iterator.hasNext()) {
  conn.read();
}

});

public class DBConn{
private static dbObj=null;

//Create a singleton method that returns only one instance of this object
}

}
文英达
2023-03-14

正如在这里的讨论中提到的,这个问题源于迭代器分区上映射操作的惰性。这种惰性意味着,对于每个分区,都会创建并关闭一个连接,只有在以后(对RDD进行操作时)才会调用readMatchingFromDB

要解决这个问题,您应该在关闭连接之前强制遍历迭代器,例如将其转换为列表(然后返回):

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close()
  newPartition.iterator // create a new iterator
})
 类似资料:
  • 是否有一种方法可以使用datastax/spark-cassandra-connector来选择每个分区密钥的最新版本,该版本相当于Cassandra3.6和更高版本的每个分区限制选项? PerPartitionLimitTest域实体 卡桑德拉表: Maven引用:

  • 找到给定RDD的每个分区大小的最佳方法是什么。我正在尝试调试一个扭曲的分区问题,我尝试了以下方法: 它适用于小型RDD,但对于大型RDD,它会产生OOM错误。我的想法是,导致了这种情况的发生。但不管怎样,我只是想知道有没有更好的方法?

  • 我确实遵循了如何在Cosmos DB中查找逻辑分区计数和大小的答案,这导致我选择了“https://docs.microsoft.com/en-us/azure/cosmos-db/use-metrics#decision-the-aphultis-distributions”。然而,该报告不再出现在Azure Portal上。我得到的只是“通过贯穿和存储的顶级逻辑分区键”。我想要一个我的所有“逻

  • 所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的: 这给我一个错误,因为map分区期望返回的类型,但这里是。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办? 谢谢

  • /tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?

  • 我有两个大的数据帧。每一行都有lat/lon数据。我的目标是在两个数据帧之间进行连接,并找到距离内的所有点,例如100m。 我想在geohash7上对df1和df2进行分区,然后只在分区内连接。我希望避免分区之间的连接以减少计算。 所以基本上加入geohash7,然后确保点之间的距离小于100。问题是,Spark实际上会交叉连接所有数据。如何使其只执行分区间连接而不执行分区内连接?