我有RDD,其中每个记录都是int:
[0,1,2,3,4,5,6,7,8]
我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表:
[[0,1,2], [3,4,5], [6,7,8]]
这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到:
>
使用ZipWithIndex枚举RDD中的记录:
[0,1,2,3,4,5]->[(0,0),(1,1),(2,2),(3,3),(4,4),(5,5)]
[(0,[0,1,2]),(1,[3,4,5])]
这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不足道的。但是有没有一种方法可以在Spark中产生类似的结果而不使用重组BY呢?
您没有清楚地解释为什么需要固定大小的RDD,根据您试图完成的任务,可能有更好的解决方案,但为了回答所问的问题,我看到以下选项:
1)根据项目数量和批处理大小实现筛选器。例如,如果您在原始RDD中有1000个项目,并希望将它们分成10个批次,那么您最终将应用10个筛选器,第一个筛选器检查索引是否为[0,99],第二个筛选器检查索引是否为[100,199],以此类推。在应用每个过滤器后,您将有一个RDD。需要注意的是,原始的RDD可能会在过滤之前被缓存。优点:每个产生的RDD都可以单独处理,不必在一个节点上完全分配。缺点:这种方法随着批处理数量的增加而变慢。
2)逻辑上与此类似,但不是filter,而是只实现一个自定义分区器,该分区器根据索引(键)返回分区id,如下所述:用于同等大小分区的自定义分区器。优点:比filters快。缺点:每个分区必须放入一个节点。
3)如果原始RDD中的顺序并不重要,只需要大致相等地分块,您可以在https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html中合并/重新分区,在此进行解释
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的
我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?
我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一
问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择:
CreateDataFrame接受2个参数,一个rdd和模式。 我的图式是这样的 <代码>val schemas=结构类型(Seq(StructField(“number”,IntegerType,false),StructField(“notation”,StringType,false))) 在一种情况下,我能够从RDD创建数据帧,如下所示: 在以下其他情况下。。我不能 data2不能成为Da
我正在使用的,并将其调用为 。 的方差非常高,以至于大约1%的对集(用百分位数方法验证)使得集合中的值总数的20%。如果Spark随机使用shuffle进行分区,那么很有可能会有1%的数据落入同一分区,从而导致工作人员之间的负载不平衡。 有没有办法确保“重”元组在分区中正常分布?我实际上将分成两个分区,和,基于) 给出的 阈值,以便分离这组元组,然后重新分区。 但获得几乎相同的运行时间。负载可能已