当前位置: 首页 > 知识库问答 >
问题:

Scala/Spark Apriori实现极其缓慢

祁曦哲
2023-03-14
import System.{exit, nanoTime}
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.{Column, SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import spark.implicits._

object Main extends Serializable {
  val s = 0.03

  def loadFakeData() : DataFrame = {
    var data = Seq("1 ",
                   "1 2 ",
                   "1 2",
                   "3",
                   "1 2 3 ",
                   "1 2 ")
                .toDF("baskets_str")
                .withColumn("baskets", split('baskets_str, " ").cast("array<int>"))
      data
  }
  
  def combo(a1: WrappedArray[Int], a2: WrappedArray[Int]): Array[Array[Int]] = {
    var a = a1.toSet
    var b = a2.toSet
    var res = a.diff(b).map(b+_) ++ b.diff(a).map(a+_)
    return res.map(_.toArray.sortWith(_ < _)).toArray
  }
  val comboUDF = udf[Array[Array[Int]], WrappedArray[Int], WrappedArray[Int]](combo)

  def getCombinations(df: DataFrame): DataFrame = {
    df.crossJoin(df.withColumnRenamed("itemsets", "itemsets_2"))
      .withColumn("combinations", comboUDF(col("itemsets"), col("itemsets_2")))
      .select("combinations")
      .withColumnRenamed("combinations", "itemsets")
      .withColumn("itemsets", explode(col("itemsets")))
      .dropDuplicates()
  }

  def countCombinations(data : DataFrame, combinations: DataFrame) : DataFrame = {
    data.crossJoin(combinations)
      .where(size(array_intersect('baskets, 'itemsets)) === size('itemsets))
      .groupBy("itemsets")
      .count
  }

  def freq() {
    val spark = SparkSession.builder.appName("FreqItemsets")
      .master("local[*]")
      .getOrCreate()

    // data is a dataframe where each row contains an array of integer values
    var data = loadFakeData()
    val basket_count = data.count

    // Itemset is a dataframe containing all possible sets of 1 element
    var itemset : DataFrame = data
                                .select(explode('baskets))
                                .na.drop
                                .dropDuplicates()
                                .withColumnRenamed("col", "itemsets")
                                .withColumn("itemsets", array('itemsets))
    var itemset_count : DataFrame = countCombinations(data, itemset).filter('count > s*basket_count)
    var itemset_counts = List(itemset_count)

    // We iterate creating each time itemsets of length k+1 from itemsets of length k
    // pruning those that do not have enough support
    var stop = (itemset_count.count == 0)
    while(!stop) {
      itemset = getCombinations(itemset_count.select("itemsets"))
      itemset_count = countCombinations(data, itemset).filter('count > s*basket_count)
      stop = (itemset_count.count == 0)
      if (!stop) {
        itemset_counts = itemset_counts :+ itemset_count
      }
    }

    spark.stop()
  }
}

共有1个答案

慕容渊
2023-03-14

由于Spark保留了在任何时候重新生成数据集的权限,所以可能会发生这种情况,在这种情况下,缓存昂贵转换的结果可以显著提高性能。

在本例中,乍一看itemset是重灾区,所以

itemset = getCombinations(itemset_count.select("itemsets")).cache

可能会支付股息。

itemset_counts = itemset_count :: itemset_counts
 类似资料:
  • 为了更好理解Hooks原理,这一节我们遵循React的运行流程,实现一个不到100行代码的极简useState Hook。建议对照着代码来看本节内容。 工作原理 对于useState Hook,考虑如下例子: function App() { const [num, updateNum] = useState(0); return <p onClick={() => updateNum(

  • ▲BIOS是什么? 所谓 BIOS,实际上就是微机的基本输入输出系统(Basic Input System),其内容集成在微机主板上的一个ROM芯片上,主要保存着有关微机系统最重要的基本输入输出程序,系统信息设置,开机上电自检程序和系统启动自举程序等。 ▲BIOS的功能 BIOS ROM 芯片不但可以在主板上看到,而且BIOS管理功能如何在很大程度上决定了主板性能是否优越。BIOS管理功能包括:

  • 嗨,我是Scala的新手,想知道如何将一个简单的ListNode类从Java改写成Scala。 在java中,它如下所示,我可以创建一个head node head=new ListNode(0),然后设置head.next=new ListNode(1) 但是我发现在Scala中很难重写相同的逻辑,下面是ListNode的case类 当我试图实例化一个head节点,并实例化另一个新节点并设置he

  • 我在做什么:我正在用C编写一个象棋引擎。我最近更新了我的引擎的minimax搜索算法,该算法使用alpha-beta修剪来利用迭代深化,以便在时间限制下运行。这是它的外观: 我的问题:这个实现的问题是,当搜索任何大于1的深度时,它将在搜索所需深度之前搜索所有之前的深度。也就是说,此迭代深化搜索首先搜索深度为1的所有移动。然后,它将再次搜索深度1,然后再搜索深度2,而不是在下一次搜索时选择深度2。然

  • php scala ruby

  • “会话”接口有两个方法,在Scala中,由于类型擦除,它们被简化为具有相同的签名: 试图实现它们会产生错误: 错误:双重定义:第199行的方法createstoredprocedurecall:(procedureName: String,resultset mappings:string *)org . hibernate . procedure . procedure call和方法creat