当前位置: 首页 > 知识库问答 >
问题:

如何使用Spark计算累计和

裴焱
2023-03-14

我有一个rdd(String,Int),它是按键排序的

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey

现在,我想用零开始第一个键的值,并将后续键作为前一个键的和。

例如:c1=0,c2=c1的值,c3=(c1值c2值),c4=(c1..c3值)预期输出:

(c1,0), (c2,6), (c3,9)...

有可能做到这一点吗?我用地图试过了,但总和没有保存在地图里。

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}

共有3个答案

尉迟彬
2023-03-14

我遇到了一个类似的问题,并实现了保罗的解决方案。我想做一个整数频率表按键排序(整数),有一个小问题与np.cumsum(partition_sums),错误是不支持的操作数类型(s)=:'int'和'NoneType' .

因为如果范围足够大,那么每个分区拥有某样东西的概率就足够大(没有无值)。但是,如果范围远小于计数,并且分区数量保持不变,则某些分区将为空。修改后的解决方案来了:

def cumsum(rdd, get_summand):
    """Given an ordered rdd of items, computes cumulative sum of
    get_summand(row), where row is an item in the RDD.
    """
    def cumsum_in_partition(iter_rows):
        total = 0
        for row in iter_rows:
            total += get_summand(row)
            yield (total, row)
    rdd = rdd.mapPartitions(cumsum_in_partition)
    def last_partition_value(iter_rows):
        final = None
        for cumsum, row in iter_rows:
            final = cumsum
        return (final,)
    partition_sums = rdd.mapPartitions(last_partition_value).collect()
    # partition_cumsums = list(np.cumsum(partition_sums))

    #----from here are the changed lines
    partition_sums = [x for x in partition_sums if x is not None] 
    temp = np.cumsum(partition_sums)
    partition_cumsums = list(temp)
    #----

    partition_cumsums = [0] + partition_cumsums   
    partition_cumsums = sc.broadcast(partition_cumsums)
    def add_sums_of_previous_partitions(idx, iter_rows):
        return ((cumsum + partition_cumsums.value[idx], row)
            for cumsum, row in iter_rows)
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
    return rdd

#test on random integer frequency
x = np.random.randint(10, size=1000)
D = sqlCtx.createDataFrame(pd.DataFrame(x.tolist(),columns=['D']))
c = D.groupBy('D').count().orderBy('D')
c_rdd =  c.rdd.map(lambda x:x['count'])
cumsums, values = zip(*cumsum(c_rdd,lambda x: x).collect())

瞿兴朝
2023-03-14

Spark内置了对hive分析/窗口功能的支持,使用分析功能可以轻松实现累积和。

Hive wiki分析/窗口功能。

例子:

假设您有sqlContext对象-

val datardd = sqlContext.sparkContext.parallelize(Seq(("a",1),("b",2), ("c",3),("d",4),("d",5),("d",6)))
import sqlContext.implicits._

//Register as test table
datardd.toDF("id","val").createOrReplaceTempView("test")

//Calculate Cumulative sum
sqlContext.sql("select id,val, " +
  "SUM(val) over (  order by id  rows between unbounded preceding and current row ) cumulative_Sum " +
  "from test").show()

这种方法会导致以下警告。在executor耗尽内存的情况下,相应地调整作业的内存参数,以处理庞大的数据集。

警告WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降

我希望这有帮助。

邹海荣
2023-03-14

>

  • 计算每个分区的部分结果:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    

    收集部分和

    val partialSums = partials.values.collect
    

    计算分区上的累积和并广播它:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    

    计算最终结果:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    

  •  类似资料:
    • 我想知道如何计算的累计总和在AnyLogic中。具体地说,我有一个循环事件,每周改变一个参数的值。从这个参数我想计算它收到的值的累计总和,我怎么做呢? 该事件是循环模式的超时。操作是: "name_parameter"=圆形(max(正常(10,200),0));

    • 由于hive只支持从类集中的子查询,如select*from(subquery),也只支持equjo,因此我们如何从表中计算累积工资,如表员工有记录如下。 因此输出应该如下所示 我怎么能在蜂巢里做到这一点

    • 问题内容: 假设我有一个Java IntStream,是否可以将其转换为具有累积总和的IntStream?例如,以[4、2、6,…]开头的流应转换为[4、6、12,…]。 更笼统地说,应该如何实施有状态流操作?感觉这应该可行: 有一个明显的限制,即它仅适用于顺序流。但是,Stream.map明确需要无状态映射函数。我是否错过了Stream.statefulMap或Stream.cumulative

    • 问题内容: 我正在和熊猫一起工作,但是我没有太多经验。我有以下DataFrame: 而且我需要计算前11行的累积总和。如果先前的数量少于11,则将剩余的数量假定为0。 我试过了: 但是,这并没有实现我想要的,但是这正在旋转累积总和的结果。我该如何实现? 问题答案: 呼叫与和和:

    • 当我使用spark API运行类似的代码时,它在许多不同的(分布式)作业中运行,并且成功运行。当我运行它时,我的代码(应该做与Spark代码相同的事情),我得到一个堆栈溢出错误。知道为什么吗? 代码如下: 我相信我正在使用与spark相同的所有并行化工作,但它对我不起作用。任何关于使我的代码分发/帮助了解为什么在我的代码中发生内存溢出的建议都将是非常有帮助的

    • 问题内容: 我正在尝试获取一些Cassandra / SPARK数据的最小,最大平均值,但我需要使用JAVA进行。 编辑以显示工作版本: 确保在“ someTable”和“ someKeyspace”周围添加“ 问题答案: 只需将您的数据导入为并应用所需的汇总即可: where 和分别存储表名和键空间。