当前位置: 首页 > 知识库问答 >
问题:

Spark:按id对RDD分组

司徒隐水
2023-03-14

我有两个RDDs。在Spark scala中,如果event1001RDD和event2009RDD具有相同的id,我该如何连接它们?

Val事件1001RDD:模式RDD=[事件类型,id,位置,日期1]

[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]

val event 2009 rdd:schemaRDD =[事件类型,id,日期1,日期2]

[2009,4929101,2015-01-20 20:44:39,2015-01-20 20:44:39]
[2009,4929102,2015-01-20 15:44:39,2015-01-20 21:44:39]
[2009,4929103,2015-01-20 14:44:39,2015-01-20 14:44:39]
[2009,4929105,2015-01-20 20:44:39,2015-01-20 20:44:39]

预期结果将是:(唯一)(按 id 排序)

[事件类型,ID,1001 的位置,1001 的日期1,2009 的日期1,2009 的日期2]

2009,4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39
1001,4929102,LOC01,2015-01-20 10:44:39,2015-01-20 15:44:39,2015-01-20 21:44:39
1001,4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,2015-01-20 14:44:39
1001,4929104,LOC03,2015-01-20 10:44:39,NULL,NULL
2009,4929105,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39

请注意,对于 id 4929102,将使用 1001 作为事件类型。仅当 2009 事件类型在 1001 中没有任何匹配的 ID 时,才会使用它。

它可以是RDD[String]-平面。或RDD元组通过聚合ByKey。我只需要迭代RDD。

共有1个答案

祁雪峰
2023-03-14

这是完全外部连接的情况。干得好。。。

d1=[[1001,4929102,"LOC01","2015-01-20 10:44:39"],[1001,4929103,"LOC02","2015-01-20 10:44:39"],[1001,4929104,"LOC03","2015-01-20 10:44:39"]]
d2=[[2009,4929101,"2015-01-20 20:44:39","2015-01-20 20:44:39"],[2009,4929102,"2015-01-20 15:44:39","2015-01-20 21:44:39"],
    [2009,4929103,"2015-01-20 14:44:39","2015-01-20 14:44:39"],[2009,4929105,"2015-01-20 20:44:39","2015-01-20 20:44:39"]]

d1RDD = sc.parallelize(d1).map(lambda t: Row(d1_eventtype=t[0],d1_id=t[1],d1_location=t[2],d1_date1=t[3]))
d2RDD = sc.parallelize(d2).map(lambda t: Row(d2_eventtype=t[0],d2_id=t[1],d2_date1=t[2],d2_date2=t[3]))
d1DF = ssc.createDataFrame(d1RDD)
d2DF = ssc.createDataFrame(d2RDD)
print d1DF.printSchema()
print d2DF.printSchema()
d1DF.show()
d2DF.show()
d1DF.registerTempTable("d1")
d2DF.registerTempTable("d2")
res = ssc.sql("select case when d1.d1_eventtype is not null then d1.d1_eventtype else d2.d2_eventtype end et, \
                      case when d1.d1_id is not null then d1.d1_id else d2.d2_id end id, \
                      d1.d1_location loc, d1.d1_date1, d2.d2_date1, d2.d2_date2 \
                 from d1 full outer join d2 on d1.d1_id=d2.d2_id order by d1.d1_id")
res.show()

结果:

root
 |-- d1_date1: string (nullable = true)
 |-- d1_eventtype: long (nullable = true)
 |-- d1_id: long (nullable = true)
 |-- d1_location: string (nullable = true)

None
root
 |-- d2_date1: string (nullable = true)
 |-- d2_date2: string (nullable = true)
 |-- d2_eventtype: long (nullable = true)
 |-- d2_id: long (nullable = true)

None
d1_date1            d1_eventtype d1_id   d1_location
2015-01-20 10:44:39 1001         4929102 LOC01      
2015-01-20 10:44:39 1001         4929103 LOC02      
2015-01-20 10:44:39 1001         4929104 LOC03      
d2_date1            d2_date2            d2_eventtype d2_id  
2015-01-20 20:44:39 2015-01-20 20:44:39 2009         4929101
2015-01-20 15:44:39 2015-01-20 21:44:39 2009         4929102
2015-01-20 14:44:39 2015-01-20 14:44:39 2009         4929103
2015-01-20 20:44:39 2015-01-20 20:44:39 2009         4929105
et   id      loc   d1_date1            d2_date1            d2_date2           
2009 4929101 null  null                2015-01-20 20:44:39 2015-01-20 20:44:39
2009 4929105 null  null                2015-01-20 20:44:39 2015-01-20 20:44:39
1001 4929102 LOC01 2015-01-20 10:44:39 2015-01-20 15:44:39 2015-01-20 21:44:39
1001 4929103 LOC02 2015-01-20 10:44:39 2015-01-20 14:44:39 2015-01-20 14:44:39
1001 4929104 LOC03 2015-01-20 10:44:39 null                null      
 类似资料:
  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。

  • 我有两对结构为rdd[String,Int]的RDD,称为rdd1和rdd2。 如果我加入前面的RDD,并在结果RDD(mapValues)的值上执行一个函数,那么所有的工作都将在一个worker中完成,而不是在集群的不同worker节点上分配不同的任务。我的意思是,期望的行为应该是在集群允许的这么多节点中并行执行作为参数传递给mapValues方法的函数。

  • 在Spark流式传输中,是否可以将特定的RDD分区分配给集群中的特定节点(为了数据局部性?) 例如,我得到一个事件流[a,a,a,b,b],并有一个2节点的Spark集群。 我希望所有的a总是去节点1,所有的b总是去节点2。 谢啦!

  • 主要内容:1.RDD特点:,2.RDD的 5大属性,3.RDD的执行原理,4.Spark的核心组件1.RDD特点: 可变: 存储的弹性 容错的弹性 计算的弹性 分片的弹性 RDD 代码中是一个抽象类, 代表弹性的, 不可变, 可分区, 里面的元素可并行计算的集合, 为弹性分布式数据集。 RDD 不保存数据, 但是有血缘关系。 不可变的是逻辑, 如果想加入新的逻辑, 必须封装。 2.RDD的 5大属性 分区列表 分区计算函数 多个RDD有依赖关系 分区器: 一个分区的规则, 和Kafka 类似

  • 我需要创建一个额外的XML标记,比如myGroup&因为我需要创建来自输入的每个ID_Number的组。 以下是我的输入: 我想根据ID_Numbers创建组,以便所有具有公共ID的mySegments都位于myGroup标记下,如下所示: 我知道使用XSLT可以很容易地完成,但我必须使用Groovy来实现这一点。请告知这是否可以由Groovy完成。我尝试了以下链接,但不确定他们如何可以应用到我的

  • 因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我