我在Spark(v2.1.1)中有一个数据集,其中有3列(如下所示)包含分层数据。
表格表示(值是所需的输出):
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Current Dataset | | Desired Dataset (Output) |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Global_ID | Parent | Child | | Global_ID | Parent | Child | Value |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| 111 | 111 | 123 | | 111 | 111 | 111 | 1 |
| 111 | 135 | 246 | | 111 | 111 | 123 | 2 |
| 111 | 123 | 456 | | 111 | 123 | 789 | 3 |
| 111 | 123 | 789 | | 111 | 123 | 456 | 4 |
| 111 | 111 | 111 | | 111 | 111 | 135 | 5 |
| 111 | 135 | 468 | | 111 | 135 | 246 | 6 |
| 111 | 135 | 268 | | 111 | 135 | 468 | 7 |
| 111 | 268 | 321 | | 111 | 135 | 268 | 8 |
| 111 | 138 | 139 | | 111 | 268 | 321 | 9 |
| 111 | 111 | 135 | | 111 | 111 | 138 | 10 |
| 111 | 111 | 138 | | 111 | 138 | 139 | 11 |
| 222 | 222 | 654 | | 222 | 222 | 222 | 12 |
| 222 | 654 | 721 | | 222 | 222 | 987 | 13 |
| 222 | 222 | 222 | | 222 | 222 | 654 | 14 |
| 222 | 721 | 127 | | 222 | 654 | 721 | 15 |
| 222 | 222 | 987 | | 222 | 721 | 127 | 16 |
| 333 | 333 | 398 | | 333 | 333 | 333 | 17 |
| 333 | 333 | 498 | | 333 | 333 | 398 | 18 |
| 333 | 333 | 333 | | 333 | 333 | 498 | 19 |
| 333 | 333 | 598 | | 333 | 333 | 598 | 20 |
+-----------+--------+-------+ +-----------+--------+-------+-------+
树形表示(每个节点旁边都显示了所需的值):
+-----+ +-----+
1 | 111 | 17 | 333 |
+--+--+ +--+--+
| |
+---------------+--------+-----------------+ +----------+----------+
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
2 | 123 | 5 | 135 | 10 | 138 | | 398 | | 498 | | 598 |
+--+--+ +--+--+ +--+--+ +--+--+ +--+--+ +--+--+
+-----+-----+ +--------+--------+ | 18 19 20
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
| 789 | | 456 | | 246 | | 468 | | 268 | | 139 | +-----+
+-----+ +-----+ +-----+ +-----+ +--+--+ +-----+ 12 | 222 |
3 4 6 7 8 | 11 +--+--+
+--v--+ |
| 321 | +------+-------+
+--+--+ | |
9 +--v--+ +--v--+
13 | 987 | 14 | 654 |
+--+--+ +--+--+
|
+--v--+
15 | 721 |
+--+--+
|
+--v--+
16 | 127 |
+--+--+
代码片段:
Dataset<Row> myDataset = spark
.sql("select Global_ID, Parent, Child from RECORDS");
JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
.agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
.orderBy(new Column("Global_ID"))
.withColumn("vars", functions.explode(<Spark UDF>)
.select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
.javaRDD().zipWithIndex();
// Sample UDF (TODO: Actual Implementation)
spark.udf().register("computeValue",
(<Column Names>) -> <functionality & implementation>,
DataTypes.<xxx>);
经过大量研究并在博客中提出了许多建议,我尝试了以下方法,但无济于事,无法实现我的方案的结果。
技术堆栈:
>
阿帕奇火花 (v2.1.1)
Java 8
AWS EMR 集群(Spark 应用程序部署)
数据量:
尝试的方法:
> < li>
Spark GraphX图表框架:
Spark GraphX Pregel API:
对当前方法的替代(或)修改的任何建议都将非常有帮助,因为我完全迷失在为这个用例找出解决方案的过程中。
感谢您的帮助!非常感谢。
注:下面的解决方案是scala spark。您可以轻松地将其转换为java代码。
看看这个。我尝试使用Spark Sql这样做,你可以得到一个想法。基本上的想法是在聚合和分组它们的同时对子、父和globalid进行排序。一旦按globalid分组和排序,则展开其余部分。您将获得有序的结果表,稍后您可以zipSusIndex
添加排名(值)
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
val ddd = sc.parallelize(t).toDF
val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
val dd1 = ddd
.groupBy($"_1")
.agg(sort_array(collect_list($"_2")).as("v"),
sort_array(collect_list($"_3")).as("w"))
.orderBy(asc("_1"))
.withColumn("vars", explode(zip($"v", $"w")))
.select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex
dd1.collect
输出
res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
问题内容: 我有一个与父子关系的表,我需要递归查询的帮助 表结构 我正在尝试进行递归查询,但是我无法做到这一点,建议我应该如何查询数据库 问题答案: 正如上面所指出的,这并不是真正的递归,但是如果您知道最大需要深入多少步,则可以沿以下方向使用某些方法(也许使用PHP生成查询): 我首先将父ID设置为NULL而不是0,但这是个人喜好。 ^^在这种情况下,您需要走多远。 [ 下一点没有严格意义 ] 然
本文向大家介绍在ASP.NET 2.0中操作数据之五十七:在分层架构中缓存数据,包括了在ASP.NET 2.0中操作数据之五十七:在分层架构中缓存数据的使用技巧和注意事项,需要的朋友参考一下 导言: 正如前面章节所言,缓存ObjectDataSource的数据只需要简单的设置一些属性。然而,它是在表现层对数据缓存,这就与ASP.NET page页面缓存策略(caching policies)紧
本文向大家介绍分层数据模型,包括了分层数据模型的使用技巧和注意事项,需要的朋友参考一下 分层数据模型是最早的数据模型之一。该模型是基于文件的模型构建,就像树一样。在此树中,父节点可以与多个子节点关联,但是一个子节点只能有一个父节点。 对于目录和文件,可以说单个目录进一步包含多个文件或目录,然后这些目录包含更多文件,依此类推。 这可以表示为- 示例 使用关系数据库的层次模型的示例如下- <员工> E
问题内容: 对于一个简单的数据结构,例如: 供参考,层次树如下所示: 我想计算每个级别的孩子人数。因此,我将获得一个新列“ NoOfChildren”,如下所示: 我读了一些有关分层数据的内容,但是我不知何故卡在了parentID的多个内部联接上。也许有人可以在这里帮助我。 问题答案: 使用 CTE可以满足您的需求。 递归地遍历所有孩子,记住根。 每个根的项目。 这些再次与您的原始表一起产生结果。
开发手册的这一部分关注于中间层开发,并明确描述了这一层的数据访问职责。 先是,详细阐述了Spring全面的事务管理支持,随后,详细说明了Spring Framework如何支持多种中间层数据访问的框架和技术。 第 10 章 事务管理 第 11 章 DAO支持 第 12 章 使用JDBC进行数据访问 第 13 章 使用ORM工具进行数据访问 目录 10. 事务管理 10.1. 简介 10.2. 动机
我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus