我有两个大型数据帧[a],一个具有由id[b]标识的所有事件id列表。我想根据 [b] 中的 id 筛选 [a] 使用 spark 2.0.0 中的 stat.bloom筛选实现
然而,我在数据集API中看不到任何将布隆过滤器加入数据框的操作[a]
val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")
val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)
根据 df2 中的值筛选“df1”的最佳方法是什么?
谢谢!
我构建了一个隐式类来包装https://stackoverflow.com/a/41989703/6723616评论欢迎!
/**
* Copyright 2017 Yahoo, Inc.
* Zlib license: https://www.zlib.net/zlib_license.html
*/
package me.klotz.spark.utils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.SparkContext
object BloomFilterEnhancedJoin {
// not parameterized for field typel; assumes string
/**
* Like .join(bigDF, smallDF, but accelerated with a Bloom filter.
* You pass in a size estimate of the bigDF, and a ratio of acceptable false positives out of the expected result set size.
* ratio=1 is a good start; that will result in about 50% false positives in the big-small join, so the filter accepts
* about as many as it passes, rather than rejecting almost all. Pass in a size estimate of the big dataframe
* to avoid enumerating it. The small DataFrame gets enumerated anyway.
*
* Example use:
* <code>
* import me.klotz.spark.utils.BloomFilterEnhancedJoin._
* val (dups_joined, bloomFilterBroadcast) = df_big.joinBloom(1024L*1024L*1024L, dups, 10.0, "id")
* dups_joined.write.format("orc").save("dups")
* bloomFilterBroadcast.unpersist
* <code>
*/
implicit class BloomFilterEnhancedJoiner(bigdf:Dataset[Row]) {
/**
* You should call bloomFilterBroadcast.unpersist after
*/
def joinBloom(bigDFCountEstimate:Long, smallDF: Dataset[Row], ratio:Double, field:String) = {
val sc = smallDF.sparkSession.sparkContext
val smallDFCount = smallDF.count
val fpr = smallDFCount.toDouble / bigDFCountEstimate.toDouble / ratio
println(s"fpr=${fpr} = smallDFCount=${smallDFCount} / bigDFCountEstimate=${bigDFCountEstimate} / ratio=${ratio}")
val bloomFilterBroadcast = sc.broadcast((smallDF.stat.bloomFilter(field, smallDFCount, fpr)))
val mightContain = udf((x: String) => if (x != null) bloomFilterBroadcast.value.mightContainString(x) else false)
(bigdf.filter(mightContain(col(field))).join(smallDF, field), bloomFilterBroadcast)
}
}
}
我想我找到了正确的方法来做到这一点,但仍然希望有指针来看看是否有更好的方法来管理它。
这是我的解决方案 -
val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val d1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")
val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val d2 = in2.map(x => (x)).toDF("c1")
val s2 = d2.stat.bloomFilter($"c1", expectedNumItems, fpp)
val a = spark.sparkContext.broadcast(s2)
val x = d1.rdd.filter(x => a.value.mightContain(x(0)))
case class newType(c1: Int, c2: Int, c3: Int) extends Serializable
val xDF = x.map(y => newType(y(0).toString.toInt, y(1).toString.toInt, y(2).toString.toInt)).toDF()
scala> d1.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
| 0| 1| 2|
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 4| 5| 6|
| 5| 6| 7|
+---+---+---+
scala> d2.show(10)
+---+
| c1|
+---+
| 0|
| 1|
| 2|
+---+
scala> xDF.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
| 0| 1| 2|
| 1| 2| 3|
| 2| 3| 4|
+---+---+---+
您可以使用自定义项:
def might_contain(f: org.apache.spark.util.sketch.BloomFilter) = udf((x: Int) =>
if(x != null) f.mightContain(x) else false)
df1.where(might_contain(sbf2)($"C1"))
问题内容: 我有两个NumPy数组,例如: 和一个过滤器数组,例如: 我如何才能得到一个新的numpy数组,其中仅包含其中相同索引为True的值?就我而言:。 根据公认的解决方案(具有不同的值): 问题答案: NumPy支持布尔索引 假设 和是NumPy数组而不是Python列表(如问题所示)。您可以使用进行转换。
问题内容: 我有一个过滤器linkifyStuff,其中需要使用其他过滤器处理一些变量。我无法弄清楚从另一个调用一个过滤器的语法。 我了解过滤器链接-这不是我想要的。我想将过滤器应用于linkifyStuff过滤器中的局部变量,而不是其输入或输出。 我希望像下面这样工作,但是$ filter(’filtername’)显然不是正确的语法。 我可以为sanitizeStuff和sanitizeStu
问题内容: 我想了解 从另一个数组的所有元素过滤数组 的最佳方法。我尝试使用过滤器功能,但是如何给它提供要删除的值并没有解决。 就像是: 如果过滤器功能没有用,您将如何实现呢? 编辑:我检查了可能重复的问题,它可能对那些容易理解javascript的人有用。选中的答案很容易。 问题答案: 您可以使用函数的参数来避免将过滤器数组存储在全局变量中。
我正在研究一个合作医疗系统。 我的代码在url调用的servlet的
我有数据,其中因子标签已提供在单独的文件。因此,当我读到里面的东西时,我得到的数据如下所示: 和包含factor_x标签的单独数据帧,如下所示: 我正在寻找一种有效的方法来更新数据帧'data'中的factor_x_labels'中的标签。 我一直试图使用forcats包中的fct_recode或dplyr中的recode,但遇到了麻烦,因为(例如)现有的和更新的标签需要作为字符串粘贴,但需要用=
我使用Room和RxJava,我想使用第二个的功能来过滤来自第一个的数据。 假设房间是返回用户。 谢谢