我想在我的spark rdd上做一个映射,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
但是,这给了我一个已经关闭的连接异常,正如预期的那样,因为在控件到达. map()
之前,我的连接
是关闭的。我想为每个RDD分区创建一个连接,并正确地关闭它。我如何实现这一点?
谢谢
rdd.foreachPartitionAsync(iterator->{
// this object will be cached inside each executor JVM. For the first time, the //connection will be created and hence forward, it will be reused.
// Very useful for streaming apps
DBConn conn=DBConn.getConnection()
while(iterator.hasNext()) {
conn.read();
}
});
public class DBConn{
private static dbObj=null;
//Create a singleton method that returns only one instance of this object
}
}
正如在这里的讨论中提到的,这个问题源于迭代器分区
上映射操作的惰性。这种惰性意味着,对于每个分区,都会创建并关闭一个连接,只有在以后(对RDD进行操作时)才会调用readMatchingFromDB
。
要解决这个问题,您应该在关闭连接之前强制遍历迭代器,例如将其转换为列表(然后返回):
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close()
newPartition.iterator // create a new iterator
})
是否有一种方法可以使用datastax/spark-cassandra-connector来选择每个分区密钥的最新版本,该版本相当于Cassandra3.6和更高版本的每个分区限制选项? PerPartitionLimitTest域实体 卡桑德拉表: Maven引用:
找到给定RDD的每个分区大小的最佳方法是什么。我正在尝试调试一个扭曲的分区问题,我尝试了以下方法: 它适用于小型RDD,但对于大型RDD,它会产生OOM错误。我的想法是,导致了这种情况的发生。但不管怎样,我只是想知道有没有更好的方法?
我确实遵循了如何在Cosmos DB中查找逻辑分区计数和大小的答案,这导致我选择了“https://docs.microsoft.com/en-us/azure/cosmos-db/use-metrics#decision-the-aphultis-distributions”。然而,该报告不再出现在Azure Portal上。我得到的只是“通过贯穿和存储的顶级逻辑分区键”。我想要一个我的所有“逻
所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的: 这给我一个错误,因为map分区期望返回的类型,但这里是。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办? 谢谢
/tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?
我有两个大的数据帧。每一行都有lat/lon数据。我的目标是在两个数据帧之间进行连接,并找到距离内的所有点,例如100m。 我想在geohash7上对df1和df2进行分区,然后只在分区内连接。我希望避免分区之间的连接以减少计算。 所以基本上加入geohash7,然后确保点之间的距离小于100。问题是,Spark实际上会交叉连接所有数据。如何使其只执行分区间连接而不执行分区内连接?