我正在尝试以下代码,该代码向RDD中的每一行添加一个数字,并使用PySpark返回RDD列表。
from pyspark.context import SparkContext
file = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file)
splits = [data.map(lambda p : int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()
输入文件(sample.txt)中的内容为:
1
2
3
我期待这样的输出(将rdd中的数字分别添加0、1、2):
[1,2,3]
[2,3,4]
[3,4,5]
而实际输出是:
[4, 5, 6]
[4, 5, 6]
[4, 5, 6]
这意味着无论 范围(4) 为何,该理解仅将值3用于变量i 。
为什么会发生这种现象?
它的发生是由于Python的后期绑定,而不是特定于(Py)Spark的。i
将在lambda p : int(p) + i
使用时(而不是在定义时)查找。通常,它是指何时调用它,但在此特定上下文中,它是序列化发送给工作人员的时间。
您可以例如执行以下操作:
def f(i):
def _f(x):
try:
return int(x) + i
except:
pass
return _f
data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
问题内容: 我在Elasticsearch中使用Pyspark。我注意到,当您创建RDD时,不会在任何收集,计数或任何其他“最终”操作之前执行该RDD。 当我将转换后的RDD的结果用于其他事情时,是否还有执行和缓存转换后的RDD的方法。 问题答案: 就像我在评论部分所说的那样, Spark中的所有转换都是 惰性的 ,因为它们不会立即计算出结果。相反,他们只记得应用于某些基本数据集(例如文件)的转换
我在Dataframe中有数据,所有列都是字符串。现在,列中的一些数据是数值的,所以我可以强制转换为浮动。其他行实际上包含我不想强制转换的字符串。 像这样的事情通常是可能的(在没有UDF等的情况下以一种性能方式)吗?
获取所有评论 GET /comments 请求查询参数: 名字 类型 描述 limit integer 可选,本次请求需要返回的数据条数。 index integer 可选,查询开始的评论位置,来源响应 id 字段。 direction string 可选,数据排序方向,以 id 进行排序,支持 asc 或 desc,默认 desc。 author integer 可选,需要筛选的评论作者,传递
评论一条资讯 获取一条资讯的评论列表 删除一条资讯评论 评论一条资讯 POST /news/{news}/comments 参数 名称 描述 body 评论内容 reply_user 被回复用户id 默认为0 Response Headers Status: 201 Created { "message": [ "操作成功" ], "comment": { "use
获取问题评论列表 获取回答评论列表 评论问题 评论答案 删除问题评论 删除回答评论 获取问题评论列表 GET /questions/:question/comments 参数 名称 类型 描述 limit Integer 默认 20 ,获取列表条数,修正值 1 - 30。 after integer 默认 0 ,筛选偏移, 上一次获取的评论列表中最后一条的id 响应 Status: 200 OK
音乐评论列表 专辑评论列表 添加音乐评论 添加专辑评论 删除音乐评论 删除专辑评论 音乐评论列表 GET /music/{music}/comments Parameters 名称 类型 描述 limit Integer 可选,默认值 15 ,获取条数 max_id Integer 可选,上次获取到数据最后一条 ID,用于获取该 ID 之后的数据。 Response Status: 200 OK