当前位置: 首页 > 知识库问答 >
问题:

Apache Spark中的分层数据操作

上官思博
2023-03-14

我在Spark(v2.1.1)中有一个数据集,其中有3列(如下所示)包含分层数据。

  • 我的目标是根据父子层次结构为每一行分配增量编号。从图形上可以说,分层数据是树的集合
  • 根据下表,我已经根据“Global_ID”对行进行了分组。现在,我想以增量顺序生成“值”列,但基于“父”列和“子”列的数据层次结构

表格表示(值是所需的输出):

    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |      Current Dataset       |         |      Desired Dataset (Output)      |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    | Global_ID | Parent | Child |         | Global_ID | Parent | Child | Value |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |       111 |    111 |   123 |         |       111 |    111 |   111 |     1 |
    |       111 |    135 |   246 |         |       111 |    111 |   123 |     2 |
    |       111 |    123 |   456 |         |       111 |    123 |   789 |     3 |
    |       111 |    123 |   789 |         |       111 |    123 |   456 |     4 |
    |       111 |    111 |   111 |         |       111 |    111 |   135 |     5 |
    |       111 |    135 |   468 |         |       111 |    135 |   246 |     6 |
    |       111 |    135 |   268 |         |       111 |    135 |   468 |     7 |
    |       111 |    268 |   321 |         |       111 |    135 |   268 |     8 |
    |       111 |    138 |   139 |         |       111 |    268 |   321 |     9 |
    |       111 |    111 |   135 |         |       111 |    111 |   138 |    10 |
    |       111 |    111 |   138 |         |       111 |    138 |   139 |    11 |
    |       222 |    222 |   654 |         |       222 |    222 |   222 |    12 |
    |       222 |    654 |   721 |         |       222 |    222 |   987 |    13 |
    |       222 |    222 |   222 |         |       222 |    222 |   654 |    14 |
    |       222 |    721 |   127 |         |       222 |    654 |   721 |    15 |
    |       222 |    222 |   987 |         |       222 |    721 |   127 |    16 |
    |       333 |    333 |   398 |         |       333 |    333 |   333 |    17 |
    |       333 |    333 |   498 |         |       333 |    333 |   398 |    18 |
    |       333 |    333 |   333 |         |       333 |    333 |   498 |    19 |
    |       333 |    333 |   598 |         |       333 |    333 |   598 |    20 |
    +-----------+--------+-------+         +-----------+--------+-------+-------+

树形表示(每个节点旁边都显示了所需的值):

                      +-----+                                           +-----+
                   1  | 111 |                                       17  | 333 |
                      +--+--+                                           +--+--+
                         |                                                 |
         +---------------+--------+-----------------+           +----------+----------+
         |                        |                 |           |          |          |
      +--v--+                  +--v--+           +--v--+     +--v--+    +--v--+    +--v--+
   2  | 123 |                5 | 135 |        10 | 138 |     | 398 |    | 498 |    | 598 |
      +--+--+                  +--+--+           +--+--+     +--+--+    +--+--+    +--+--+  
   +-----+-----+         +--------+--------+        |          18         19         20
   |           |         |        |        |        |  
+--v--+     +--v--+   +--v--+  +--v--+  +--v--+  +--v--+ 
| 789 |     | 456 |   | 246 |  | 468 |  | 268 |  | 139 |                 +-----+
+-----+     +-----+   +-----+  +-----+  +--+--+  +-----+             12  | 222 |
   3           4         6        7      8 |        11                   +--+--+
                                        +--v--+                             |
                                        | 321 |                      +------+-------+
                                        +--+--+                      |              |
                                           9                      +--v--+        +--v--+
                                                               13 | 987 |    14  | 654 |
                                                                  +--+--+        +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             15  | 721 |
                                                                                 +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             16  | 127 |
                                                                                 +--+--+

代码片段:

Dataset<Row> myDataset = spark
                .sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
        functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
    .orderBy(new Column("Global_ID"))
    .withColumn("vars", functions.explode(<Spark UDF>)
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
    .javaRDD().zipWithIndex();


// Sample UDF (TODO: Actual Implementation)   
spark.udf().register("computeValue",
                (<Column Names>) -> <functionality & implementation>,
                DataTypes.<xxx>);

经过大量研究并在博客中提出了许多建议,我尝试了以下方法,但无济于事,无法实现我的方案的结果。

技术堆栈:

>

  • 阿帕奇火花 (v2.1.1)

    Java 8

    AWS EMR 集群(Spark 应用程序部署)

    数据量:

      < li >数据集中大约有2千万行

    尝试的方法:

    > < li>

    Spark GraphX图表框架:

    • 使用这种组合,我只能实现顶点和边缘之间的关系,但它不适合我的用例。
      参考: https://graphframes.github.io/user-guide.html

    Spark GraphX Pregel API:

    • 这是我最接近实现预期结果的,但不幸的是,我找不到相同的Java代码片段。其中一个博客中提供的示例是Scala,我不太熟悉。
      参考: https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr

    对当前方法的替代(或)修改的任何建议都将非常有帮助,因为我完全迷失在为这个用例找出解决方案的过程中。

    感谢您的帮助!非常感谢。

  • 共有1个答案

    章丰茂
    2023-03-14

    注:下面的解决方案是scala spark。您可以轻松地将其转换为java代码。

    看看这个。我尝试使用Spark Sql这样做,你可以得到一个想法。基本上的想法是在聚合和分组它们的同时对子、父和globalid进行排序。一旦按globalid分组和排序,则展开其余部分。您将获得有序的结果表,稍后您可以zipSusIndex添加排名(值)

       import org.apache.spark.sql.SQLContext
       import org.apache.spark.sql.functions._
       import org.apache.spark.sql.expressions.UserDefinedFunction
       import org.apache.spark.sql.functions.udf
    
       val sqlContext = new SQLContext(sc)
       import sqlContext.implicits._
    
       val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
       val ddd = sc.parallelize(t).toDF
       val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
       val dd1 = ddd
        .groupBy($"_1")
        .agg(sort_array(collect_list($"_2")).as("v"),
             sort_array(collect_list($"_3")).as("w"))
        .orderBy(asc("_1"))
        .withColumn("vars", explode(zip($"v", $"w")))
        .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex
    
      dd1.collect
    

    输出

        res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
    ([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
    
     类似资料:
    • 问题内容: 我有一个与父子关系的表,我需要递归查询的帮助 表结构 我正在尝试进行递归查询,但是我无法做到这一点,建议我应该如何查询数据库 问题答案: 正如上面所指出的,这并不是真正的递归,但是如果您知道最大需要深入多少步,则可以沿以下方向使用某些方法(也许使用PHP生成查询): 我首先将父ID设置为NULL而不是0,但这是个人喜好。 ^^在这种情况下,您需要走多远。 [ 下一点没有严格意义 ] 然

    • 本文向大家介绍在ASP.NET 2.0中操作数据之五十七:在分层架构中缓存数据,包括了在ASP.NET 2.0中操作数据之五十七:在分层架构中缓存数据的使用技巧和注意事项,需要的朋友参考一下 导言:   正如前面章节所言,缓存ObjectDataSource的数据只需要简单的设置一些属性。然而,它是在表现层对数据缓存,这就与ASP.NET page页面缓存策略(caching policies)紧

    • 本文向大家介绍分层数据模型,包括了分层数据模型的使用技巧和注意事项,需要的朋友参考一下 分层数据模型是最早的数据模型之一。该模型是基于文件的模型构建,就像树一样。在此树中,父节点可以与多个子节点关联,但是一个子节点只能有一个父节点。 对于目录和文件,可以说单个目录进一步包含多个文件或目录,然后这些目录包含更多文件,依此类推。 这可以表示为- 示例 使用关系数据库的层次模型的示例如下- <员工> E

    • 问题内容: 对于一个简单的数据结构,例如: 供参考,层次树如下所示: 我想计算每个级别的孩子人数。因此,我将获得一个新列“ NoOfChildren”,如下所示: 我读了一些有关分层数据的内容,但是我不知何故卡在了parentID的多个内部联接上。也许有人可以在这里帮助我。 问题答案: 使用 CTE可以满足您的需求。 递归地遍历所有孩子,记住根。 每个根的项目。 这些再次与您的原始表一起产生结果。

    • 开发手册的这一部分关注于中间层开发,并明确描述了这一层的数据访问职责。 先是,详细阐述了Spring全面的事务管理支持,随后,详细说明了Spring Framework如何支持多种中间层数据访问的框架和技术。 第 10 章 事务管理 第 11 章 DAO支持 第 12 章 使用JDBC进行数据访问 第 13 章 使用ORM工具进行数据访问 目录 10. 事务管理 10.1. 简介 10.2. 动机

    • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus