当前位置: 首页 > 知识库问答 >
问题:

火花列明智字数

吕向阳
2023-03-14

我们正试图在spark中生成数据集的逐列统计数据。除了使用统计库中的summary函数之外。我们使用以下程序:

>

  • 我们确定具有字符串值的列

    为整个数据集生成键值对,使用列号作为键,使用列的值作为值

    生成新的格式映射

    (K,V)-

    然后我们使用reduceByKey来找到所有列中所有唯一值的总和。我们缓存这个输出以减少进一步的计算时间。

    在下一步中,我们使用for循环遍历列,以查找所有列的统计信息。

    我们试图通过再次使用map reduce方法来减少for循环,但我们无法找到某种方法来实现它。这样做将允许我们在一次执行中为所有列生成列统计信息。for循环方法按顺序运行,速度非常慢。

    代码:

    //drops the header
    
        def dropHeader(data: RDD[String]): RDD[String] = {
             data.mapPartitionsWithIndex((idx, lines) => {
               if (idx == 0) {
                 lines.drop(1)
               }
               lines
             })
           }
    
        def retAtrTuple(x: String) = {
           val newX = x.split(",")
           for (h <- 0 until newX.length) 
              yield (h,newX(h))
        }
    
    
    
        val line = sc.textFile("hdfs://.../myfile.csv")
    
        val withoutHeader: RDD[String] = dropHeader(line)
    
        val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value
    
    
        var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)
    
        var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
        var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}
    
        var str_col = str_cols.toArray   //array consisting the string col
        var num_col = num_cols.toArray   //array consisting numeric col
    
    
        val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
        val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
        var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }
    
        //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
        for(i <- str_col){
           var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
           var leastOnes = total.take(10)
           println("leastOnes for Col" + i)
           leastOnes.foreach(println)
           var maxOnes = total.sortBy(-_._2._2).take(10)
           println("maxOnes for Col" + i)
           maxOnes.foreach(println)
           println("distinct for Col" + i + " is " + total.count)
        }
    
  • 共有2个答案

    章哲茂
    2023-03-14

    感谢@Daniel Darabos的回答。但是有一些错误。

    >

  • 地图和收藏的混合使用。易变的。地图

    (i: Int)=

    下面是修改后的代码:

    //import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object BoundedPriorityQueueTest {
    
      //  https://stackoverflow.com/questions/28166190/spark-column-wise-word-count
      def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
        // A heap that only keeps the top N values, so it has bounded size.
        type Heap = BoundedPriorityQueue[(Long, String)]
        // Get the word counts.
        val counts: RDD[((Int, String), Long)] =
        rdd.map(_ -> 1L).reduceByKey(_ + _)
        // In each partition create a column -> heap map.
        val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
        counts.mapPartitions { items =>
          val heaps =
            collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
          for (((k, v), count) <- items) {
            println("\n---")
            println("before add " + ((k, v), count) + ", the map is: ")
            println(heaps)
            if (!heaps.contains(k)) {
              println("not contains key " + k)
              heaps(k) = new Heap(n)
              println(heaps)
            }
            heaps(k) += count -> v
            println("after add " + ((k, v), count) + ", the map is: ")
            println(heaps)
    
          }
          println(heaps)
          Iterator.single(heaps)
        }
        // Merge the per-partition heap maps into one.
        val merged: collection.mutable.Map[Int, Heap] =
        perPartition.reduce { (heaps1, heaps2) =>
          val heaps =
            collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
          println(heaps)
          for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
            for (cv <- heap) {
              heaps(k) += cv
            }
          }
          heaps
        }
        // Discard counts, return just the top strings.
        merged.mapValues(_.map { case (count, value) => value }).toMap
      }
    
      def main(args: Array[String]): Unit = {
        Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
        val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    
    
        val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
        println("# words:" + words.count())
    
        val result = top(1, words)
    
        println("\n--result:")
        println(result)
        sc.stop()
    
        print("DONE")
      }
    
    }
    

  • 杜经艺
    2023-03-14

    让我把你的问题简化一点。(实际上很多。)我们有一个RDD[(Int,String)],我们想为每个Int(都在0-100范围内)找到前10个最常见的Strings。

    与示例中的排序不同,使用Spark内置的RDD更有效。顶部(n)方法。它的运行时间与数据的大小成线性关系,与排序相比,需要移动的数据要少得多。

    在RDD中考虑<代码> Tope<代码>的实现。斯卡拉。您也希望这样做,但每个Int键都有一个优先级队列(堆)。代码变得相当复杂:

    import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.
    
    def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
      // A heap that only keeps the top N values, so it has bounded size.
      type Heap = BoundedPriorityQueue[(Long, String)]
      // Get the word counts.
      val counts: RDD[[(Int, String), Long)] =
        rdd.map(_ -> 1L).reduceByKey(_ + _)
      // In each partition create a column -> heap map.
      val perPartition: RDD[Map[Int, Heap]] =
        counts.mapPartitions { items =>
          val heaps =
            collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
          for (((k, v), count) <- items) {
            heaps(k) += count -> v
          }
          Iterator.single(heaps)
        }
      // Merge the per-partition heap maps into one.
      val merged: Map[Int, Heap] =
        perPartition.reduce { (heaps1, heaps2) =>
          val heaps =
            collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
          for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
            for (cv <- heap) {
              heaps(k) += cv
            }
          }
          heaps
        }
      // Discard counts, return just the top strings.
      merged.mapValues(_.map { case(count, value) => value })
    }
    

    这是有效的,但会带来痛苦,因为我们需要同时处理多个列。如果每列有一个RDD,只需调用RDD就可以了。顶部(10)

    不幸的是,将RDD分割成N个较小的RDD的幼稚方法没有通过N次:

    def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
      together.cache // We will make N passes over this RDD.
      (0 until columns).map {
        i => together.filter { case (key, value) => key == i }.values
      }
    }
    

    一个更有效的解决方案可能是按键将数据写入单独的文件,然后将其加载回单独的RDD。这在按键Spark-one Spark作业写入多个输出中讨论。

     类似资料:
    • 我试图从获取列,并将其转换为。

    • 我有一个Spark dataframe,如下所示: 在此数据Frame中,features列是一个稀疏向量。在我的脚本,我必须保存这个DF文件在磁盘上。这样做时,features列被保存为文本列:示例。如您所料,在Spark中再次导入时,该列将保持字符串。如何将列转换回(稀疏)向量格式?

    • Spark v2.4 <代码>火花。sql。调试。此处定义了MaxToString字段https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 不知何故,它变成了在https://github.com/apache/spark

    • 是否有可能在火花中将多个列爆炸成一个新列?我有一个如下所示的数据框: 期望输出: 到目前为止,我尝试过: 这不起作用。非常感谢您的任何建议。

    • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

    • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace: