在我的猪代码中,我这样做:
all_combined = Union relation1, relation2,
relation3, relation4, relation5, relation 6.
我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行:
first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
是否有联合运算符可以让我一次对多个rdds进行操作:
例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)
这是一个方便的问题。
如果这些是RDD,则可以使用SparkContext.union
方法:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])
rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()
## [1, 2, 3, 4, 5, 6, 7, 8, 9]
没有DataFrame
等效项,但这只是一个简单的问题:
from functools import reduce # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))
unionAll(df1, df2, df3).show()
## +---+----+
## | k| v|
## +---+----+
## | 1|foo1|
## | 2|bar1|
## | 3|foo2|
## | 4|bar2|
## | 5|foo3|
## | 6|bar3|
## +---+----+
如果要在RDD上DataFrames
大量使用SparkContext.union
和重新创建,DataFrame
可能是避免与准备执行计划的成本相关的问题的更好选择:
def unionAll(*dfs):
first, *_ = dfs # Python 3.x, for 2.x you'll have to unpack manually
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]),
first.schema
)
我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?
一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的
我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一
CreateDataFrame接受2个参数,一个rdd和模式。 我的图式是这样的 <代码>val schemas=结构类型(Seq(StructField(“number”,IntegerType,false),StructField(“notation”,StringType,false))) 在一种情况下,我能够从RDD创建数据帧,如下所示: 在以下其他情况下。。我不能 data2不能成为Da
我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不