我有两个RDD说
rdd1 =
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11
rdd2 =
[1,2,3,4,5] # lets call these value as timestamps (ts)
RDD2基本上是使用范围(intial_value、end_value、间隔)生成的。这里的参数可以变化。大小可以与rdd1相同或不同。这个想法是基于使用过滤Criertia的rdd2值将记录从rdd1提取到rdd2(rdd1的记录可以在提取时重复,正如您在输出中看到的那样)
过滤条件rdd1。创建
预期产出:
ts | prices
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5
现在我想根据一些使用RDD2键的条件过滤RDD1。(如上所述)并返回将RDD2的键与RDD1的过滤结果连接起来的结果
所以我这样做:
rdd2.map(lambda x : somefilterfunction(x, rdd1))
def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))
我得到:
异常:似乎您正试图广播RDD或从操作或转换引用RDD。RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x:rdd2.values.count()*x)无效,因为值转换和计数操作不能在rdd1.map转型。有关详细信息,请参阅SPARK-5063。
我尝试使用groupBy,但由于这里rdd1的元素可以重复一次又一次,与分组相比,我理解分组会将rdd1的每个元素只在某个特定的插槽中组合一次。
现在唯一的方法是使用一个普通的for循环并进行过滤,最后连接所有内容。
有什么建议吗?
由于您使用常规范围,因此根本没有理由创建第二个RDD。您可以简单地为每个记录生成特定范围内的值:
from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile
rdd1 = sc.parallelize([
(1, 1, 2, 10),
(2, 1, 5, 11),
(3, 2, 3, 11),
(4, 3, 4, 12),
(5, 3, 5, 11),
])
def generate(start, end, step):
def _generate(id, created, destroyed, price):
# Smallest ts >= created
start_for_record = int(ceil((created - start) / step) * step + start)
rng = takewhile(
lambda x: created <= x < destroyed,
xrange(start_for_record, end, step)) # In Python 3.x use range
for i in rng:
yield i, price
return _generate
result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()
结果:
result.mapValues(list).collect()
## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]
问题内容: 我正在尝试在Spark中实现K最近邻算法。我想知道是否可以使用嵌套的RDD。这将使我的生活更加轻松。考虑以下代码片段。 目前,此嵌套设置出现错误(我可以在此处发布完整日志)。可以放拳头吗?谢谢 问题答案: 不,这是不可能的,因为RDD的项必须可序列化,而RDD不可序列化。这是有道理的,否则您可能会通过网络传输整个RDD,如果其中包含大量数据,这将是一个问题。如果它不包含很多数据,则可能
主要内容:1.RDD特点:,2.RDD的 5大属性,3.RDD的执行原理,4.Spark的核心组件1.RDD特点: 可变: 存储的弹性 容错的弹性 计算的弹性 分片的弹性 RDD 代码中是一个抽象类, 代表弹性的, 不可变, 可分区, 里面的元素可并行计算的集合, 为弹性分布式数据集。 RDD 不保存数据, 但是有血缘关系。 不可变的是逻辑, 如果想加入新的逻辑, 必须封装。 2.RDD的 5大属性 分区列表 分区计算函数 多个RDD有依赖关系 分区器: 一个分区的规则, 和Kafka 类似
主要内容:1.From Memory,2.From File,3.From File1.From Memory 这里的makeRDD和parallelize没有区别, make底层用的就是parallelize函数 2.From File 3.From File 第二个方法返回了完整路径
我有一个RDD,其模式如下: (我们称之为) 我希望创建一个新的RDD,每一行都为,键和值属于。 我希望输出如下: 有人能帮我处理这段代码吗? 我的尝试: 错误:值映射不是Char的成员 我理解这是因为map函数只适用于,而不是每个。请帮助我在中使用嵌套函数。
主要内容:转换,操作RDD提供两种类型的操作: 转换 行动 转换 在Spark中,转换的作用是从现有数据集创建新数据集。转换是惰性的,因为它们仅在动作需要将结果返回到驱动程序时才计算。 下面来看看一些常用的RDD转换。 - 它返回一个新的分布式数据集, 该数据集是通过函数传递源的每个元素而形成的。 - 它返回一个新数据集, 该数据集是通过选择函数返回的源元素而形成的。 - 这里,每个输入项可以映射到零个或多个输出项,
RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行