当前位置: 首页 > 知识库问答 >
问题:

Spark嵌套RDD操作

翁文康
2023-03-14

我有两个RDD说

   rdd1 = 
id            | created     | destroyed | price   
1            | 1            | 2            | 10        
2            | 1            | 5            | 11       
3            | 2            | 3            | 11        
4            | 3            | 4            | 12        
5            | 3            | 5            | 11       

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

RDD2基本上是使用范围(intial_value、end_value、间隔)生成的。这里的参数可以变化。大小可以与rdd1相同或不同。这个想法是基于使用过滤Criertia的rdd2值将记录从rdd1提取到rdd2(rdd1的记录可以在提取时重复,正如您在输出中看到的那样)

过滤条件rdd1。创建

预期产出:

ts             | prices  
1              | 10,11       # i.e. for ids 1,2 of rdd1      
2              | 11,11       # ids 2,3
3              | 11,12,11    # ids 2,4,5 
4              | 11,11       # ids 2,5

现在我想根据一些使用RDD2键的条件过滤RDD1。(如上所述)并返回将RDD2的键与RDD1的过滤结果连接起来的结果

所以我这样做:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
    filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
    prices = filtered_rdd1.map(lambda x : x[3])
    res = prices.collect()
    return (x, list(res))

我得到:

异常:似乎您正试图广播RDD或从操作或转换引用RDD。RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x:rdd2.values.count()*x)无效,因为值转换和计数操作不能在rdd1.map转型。有关详细信息,请参阅SPARK-5063。

我尝试使用groupBy,但由于这里rdd1的元素可以重复一次又一次,与分组相比,我理解分组会将rdd1的每个元素只在某个特定的插槽中组合一次。

现在唯一的方法是使用一个普通的for循环并进行过滤,最后连接所有内容。

有什么建议吗?

共有1个答案

卫和洽
2023-03-14

由于您使用常规范围,因此根本没有理由创建第二个RDD。您可以简单地为每个记录生成特定范围内的值:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
    (1, 1, 2, 10),        
    (2, 1, 5, 11),       
    (3, 2, 3, 11),        
    (4, 3, 4, 12),        
    (5, 3, 5, 11),  
])


def generate(start, end, step):
    def _generate(id, created, destroyed, price):
        # Smallest ts >= created
        start_for_record = int(ceil((created - start) / step) * step + start)
        rng = takewhile(
            lambda x: created <= x < destroyed,
            xrange(start_for_record, end, step)) # In Python 3.x use range
        for i in rng:
            yield i, price

    return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

结果:

result.mapValues(list).collect()

## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]
 类似资料:
  • 问题内容: 我正在尝试在Spark中实现K最近邻算法。我想知道是否可以使用嵌套的RDD。这将使我的生活更加轻松。考虑以下代码片段。 目前,此嵌套设置出现错误(我可以在此处发布完整日志)。可以放拳头吗?谢谢 问题答案: 不,这是不可能的,因为RDD的项必须可序列化,而RDD不可序列化。这是有道理的,否则您可能会通过网络传输整个RDD,如果其中包含大量数据,这将是一个问题。如果它不包含很多数据,则可能

  • 主要内容:1.RDD特点:,2.RDD的 5大属性,3.RDD的执行原理,4.Spark的核心组件1.RDD特点: 可变: 存储的弹性 容错的弹性 计算的弹性 分片的弹性 RDD 代码中是一个抽象类, 代表弹性的, 不可变, 可分区, 里面的元素可并行计算的集合, 为弹性分布式数据集。 RDD 不保存数据, 但是有血缘关系。 不可变的是逻辑, 如果想加入新的逻辑, 必须封装。 2.RDD的 5大属性 分区列表 分区计算函数 多个RDD有依赖关系 分区器: 一个分区的规则, 和Kafka 类似

  • 主要内容:1.From Memory,2.From File,3.From File1.From Memory 这里的makeRDD和parallelize没有区别, make底层用的就是parallelize函数 2.From File 3.From File 第二个方法返回了完整路径

  • 我有一个RDD,其模式如下: (我们称之为) 我希望创建一个新的RDD,每一行都为,键和值属于。 我希望输出如下: 有人能帮我处理这段代码吗? 我的尝试: 错误:值映射不是Char的成员 我理解这是因为map函数只适用于,而不是每个。请帮助我在中使用嵌套函数。

  • 主要内容:转换,操作RDD提供两种类型的操作: 转换 行动 转换 在Spark中,转换的作用是从现有数据集创建新数据集。转换是惰性的,因为它们仅在动作需要将结果返回到驱动程序时才计算。 下面来看看一些常用的RDD转换。 - 它返回一个新的分布式数据集, 该数据集是通过函数传递源的每个元素而形成的。 - 它返回一个新数据集, 该数据集是通过选择函数返回的源元素而形成的。 - 这里,每个输入项可以映射到零个或多个输出项,

  • RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行