当前位置: 首页 > 知识库问答 >
问题:

使用Spark 2.0.0中的stat.bloomFilter过滤另一个数据帧

齐阳
2023-03-14

我有两个大型数据帧[a],一个具有由id[b]标识的所有事件id列表。我想根据 [b] 中的 id 筛选 [a] 使用 spark 2.0.0 中的 stat.bloom筛选实现

然而,我在数据集API中看不到任何将布隆过滤器加入数据框的操作[a]

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")

val expectedNumItems: Long = 1000
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)

根据 df2 中的值筛选“df1”的最佳方法是什么?

谢谢!

共有3个答案

何安宜
2023-03-14

构建了一个隐式类来包装https://stackoverflow.com/a/41989703/6723616评论欢迎!

/**
  * Copyright 2017 Yahoo, Inc.
  * Zlib license: https://www.zlib.net/zlib_license.html
  */

package me.klotz.spark.utils

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.SparkContext

object BloomFilterEnhancedJoin {

  // not parameterized for field typel; assumes string
  /**
    *  Like .join(bigDF, smallDF, but accelerated with a Bloom filter.
    *  You pass in a size estimate of the bigDF, and a ratio of acceptable false positives out of the expected result set size.
    *  ratio=1 is a good start; that will result in about 50% false positives in the big-small join, so the filter accepts
    *  about as many as it passes, rather than rejecting almost all.  Pass in a size estimate of the big dataframe
    *  to avoid enumerating it.  The small DataFrame gets enumerated anyway.
    *  
    *  Example use:
    *  <code>
    *  import me.klotz.spark.utils.BloomFilterEnhancedJoin._
    *  val (dups_joined, bloomFilterBroadcast) = df_big.joinBloom(1024L*1024L*1024L, dups, 10.0, "id")
    *  dups_joined.write.format("orc").save("dups")
    *  bloomFilterBroadcast.unpersist
    *  <code>
    */
  implicit class BloomFilterEnhancedJoiner(bigdf:Dataset[Row]) {
    /**
      * You should call bloomFilterBroadcast.unpersist after
      */
    def joinBloom(bigDFCountEstimate:Long, smallDF: Dataset[Row], ratio:Double, field:String) = {
      val sc = smallDF.sparkSession.sparkContext
      val smallDFCount = smallDF.count
      val fpr = smallDFCount.toDouble / bigDFCountEstimate.toDouble / ratio
      println(s"fpr=${fpr} = smallDFCount=${smallDFCount} / bigDFCountEstimate=${bigDFCountEstimate} / ratio=${ratio}")

      val bloomFilterBroadcast = sc.broadcast((smallDF.stat.bloomFilter(field, smallDFCount, fpr)))
      val mightContain = udf((x: String) => if (x != null) bloomFilterBroadcast.value.mightContainString(x) else false)

      (bigdf.filter(mightContain(col(field))).join(smallDF, field), bloomFilterBroadcast)
    }
  }

}
萧光华
2023-03-14

我想我找到了正确的方法来做到这一点,但仍然希望有指针来看看是否有更好的方法来管理它。

这是我的解决方案 -

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val d1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val d2 = in2.map(x => (x)).toDF("c1")

val s2 = d2.stat.bloomFilter($"c1", expectedNumItems, fpp)

val a = spark.sparkContext.broadcast(s2)

val x = d1.rdd.filter(x => a.value.mightContain(x(0)))

case class newType(c1: Int, c2: Int, c3: Int) extends Serializable

val xDF = x.map(y => newType(y(0).toString.toInt, y(1).toString.toInt, y(2).toString.toInt)).toDF()

scala> d1.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  0|  1|  2|
|  1|  2|  3|
|  2|  3|  4|
|  3|  4|  5|
|  4|  5|  6|
|  5|  6|  7|
+---+---+---+

scala> d2.show(10)
+---+
| c1|
+---+
|  0|
|  1|
|  2|
+---+

scala> xDF.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  0|  1|  2|
|  1|  2|  3|
|  2|  3|  4|
+---+---+---+
邵沛
2023-03-14

您可以使用自定义项:

def might_contain(f: org.apache.spark.util.sketch.BloomFilter) = udf((x: Int) => 
  if(x != null) f.mightContain(x) else false)

df1.where(might_contain(sbf2)($"C1"))
 类似资料:
  • 问题内容: 我有两个NumPy数组,例如: 和一个过滤器数组,例如: 我如何才能得到一个新的numpy数组,其中仅包含其中相同索引为True的值?就我而言:。 根据公认的解决方案(具有不同的值): 问题答案: NumPy支持布尔索引 假设 和是NumPy数组而不是Python列表(如问题所示)。您可以使用进行转换。

  • 问题内容: 我有一个过滤器linkifyStuff,其中需要使用其他过滤器处理一些变量。我无法弄清楚从另一个调用一个过滤器的语法。 我了解过滤器链接-这不是我想要的。我想将过滤器应用于linkifyStuff过滤器中的局部变量,而不是其输入或输出。 我希望像下面这样工作,但是$ filter(’filtername’)显然不是正确的语法。 我可以为sanitizeStuff和sanitizeStu

  • 问题内容: 我想了解 从另一个数组的所有元素过滤数组 的最佳方法。我尝试使用过滤器功能,但是如何给它提供要删除的值并没有解决。 就像是: 如果过滤器功能没有用,您将如何实现呢? 编辑:我检查了可能重复的问题,它可能对那些容易理解javascript的人有用。选中的答案很容易。 问题答案: 您可以使用函数的参数来避免将过滤器数组存储在全局变量中。

  • 我正在研究一个合作医疗系统。 我的代码在url调用的servlet的

  • 我有数据,其中因子标签已提供在单独的文件。因此,当我读到里面的东西时,我得到的数据如下所示: 和包含factor_x标签的单独数据帧,如下所示: 我正在寻找一种有效的方法来更新数据帧'data'中的factor_x_labels'中的标签。 我一直试图使用forcats包中的fct_recode或dplyr中的recode,但遇到了麻烦,因为(例如)现有的和更新的标签需要作为字符串粘贴,但需要用=

  • 我使用Room和RxJava,我想使用第二个的功能来过滤来自第一个的数据。 假设房间是返回用户。 谢谢