当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Streaming中使用CombineByKey仅在分区内进行'reduce'?

壤驷坚
2023-03-14

我已经通过Kafka将数据按键排序到我的Spark流分区中,即在一个节点上找到的键在任何其他节点上都找不到。

我希望使用redis及其incrby(increment by)命令作为状态引擎,并且为了减少发送到redis的请求数,我希望通过在每个工作节点上进行字数计数来部分减少数据。(关键是标签+时间戳,从字数中获取我的功能)。我希望避免洗牌,让redis负责跨工作节点添加数据。

即使我检查了数据在工作节点之间是否被清晰地分割,.reduce(_+_)(Scala语法)也会花费很长的时间(几秒,而对于map任务则是次秒),因为HashPartitioner似乎会将我的数据洗牌到一个随机节点以将其添加到那里。

如何在Spark Streaming中不触发Scala中的shuffling步骤的情况下,在每个分区器上编写一个简单的字数reduce?

注DStream对象缺少一些RDD方法,这些方法只有通过transform方法才可用。

看来我可以使用combinebykey。我想跳过mergeCombiners()步骤,而是将累积的元组留在原处。《学习的火花》一书谜一般地说:

如果我们知道数据不会从中受益,我们可以禁用combineByKey()中的映射端聚合。例如,groupByKey()禁用映射端聚合,因为聚合函数(附加到列表)不会节省任何空间。如果我们想禁用映射端组合,我们需要指定分区器;现在,您可以通过传递RDD.partitioner在源RDD上使用分区器。

更糟糕的是,据我所知,在Spark Streaming中没有为DStream RDD设置分区器,所以我不知道如何为combineByKey提供一个分区器,它不会最终对数据进行洗牌。

此外,“地图端”实际上是什么意思,mapsidecombine=false会产生什么后果?

CombineByKey的scala实现可以在https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/pairrddfunctions上找到。scala查找CombineByKeyWithClasstag

如果解决方案涉及自定义分区器,请同时包含一个代码示例,说明如何将该分区器应用于传入的DStream。

共有1个答案

李烨
2023-03-14

这可以使用mappartitions来完成,该函数将一个分区上的输入RDD的迭代器映射到输出RDD上的迭代器。

要实现字数计数,我映射到_.2以删除Kafka键,然后使用foldleft执行快速迭代器字数计数,初始化mutable.hashmap,然后将其转换为迭代器以形成输出RDD。

val myDstream = messages
  .mapPartitions( it =>
    it.map(_._2)
    .foldLeft(new mutable.HashMap[String, Int])(
      (count, key) => count += (key -> (count.getOrElse(key, 0) + 1))
    ).toIterator
  )
 类似资料:
  • 我有两个大的数据帧。每一行都有lat/lon数据。我的目标是在两个数据帧之间进行连接,并找到距离内的所有点,例如100m。 我想在geohash7上对df1和df2进行分区,然后只在分区内连接。我希望避免分区之间的连接以减少计算。 所以基本上加入geohash7,然后确保点之间的距离小于100。问题是,Spark实际上会交叉连接所有数据。如何使其只执行分区间连接而不执行分区内连接?

  • 类似Bigtable的数据库存储按其键排序的行。 Cassandra使用分区和聚类键的结合来保持数据的分布和排序;但是,您只能通过使用分区键来选择行! 用于上述查询的Cassandra存储层的可视化。

  • 问题内容: 我有一个内存中大约有1000个项目的数据集,正在尝试为此数据集创建一个传呼机,但是我不确定如何执行此操作。 我使用的是自定义过滤器功能来过滤结果,效果很好,但是以某种方式我需要获取页面数。 有什么线索吗? 问题答案: 查看UI Bootstrap的分页指令。我最终使用了它,而不是使用此处发布的内容,因为它具有当前使用的足够功能,并且具有详尽的测试规范。 视图 控制者 我做了一个工作的小

  • 我试图查询具有分区键和排序键的表(但是分区键和排序键是1:1,我只想使用分区键[仅返回一项]进行查询)。 这是我尝试过的代码,但没有成功(testId是分区键名,1234567890是字符串形式的分区键值);你们都知道我可以只使用分区键进行查询的方法吗?记住,由于分区键和排序键是1:1,所以只会返回一个项?提前非常感谢您。[这是我的第一篇堆栈溢出帖子-很抱歉,如果我用词不当,我很乐意回答关于我的措

  • 问题内容: 有没有一种方法可以使用VLOOKUP内部联接两个不同的Excel电子表格? 在SQL中,我可以这样进行: 工作表1: 工作表2: 结果将是: 如何在VLOOKUP中执行此操作?还是除了VLOOKUP之外还有更好的方法吗? 谢谢。 问题答案: 首先,让我们获取两个表中都存在的值的列表。如果您使用的是excel 2010或更高版本,则在工作表3 A2中输入以下公式: 如果您使用的是2007