当前位置: 首页 > 知识库问答 >
问题:

如何在akka流中使用mapAsync分组子流

邹誉
2023-03-14

我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错

由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt)

我尝试按照akka流的模式指南http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html中的建议在中间放置一个缓冲区

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

共有1个答案

姜嘉赐
2023-03-14

对Jrudolph完全正确的评论进行了扩展。

在此实例中不需要mapasync。作为一个基本示例,假设您有一个元组的源

import akka.stream.scaladsl.{Source, Sink}

def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))

val originalSource = Source(data)

然后可以执行groupBy来创建源的源

def getID(tuple : (String, Int)) = tuple._1

//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

每一个分组的源都可以通过一个map并行处理,不需要任何花哨的东西。下面是一个示例,每个分组都在一个独立的流中求和:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String, Int)) = tuple._2

//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

现在,所有“foo”数字与所有“bar”数字并行求和。

MapAsync用于以下情况:您有一个封装函数返回future[T],而您试图发出T;你的问题不是这样的。此外,mapAsync还需要等待结果,这不是反应性的...

 类似资料:
  • 我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法: 我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。 以下想法并没有向我解释: null

  • 我有一个akka流,我有一个ADT的形式。 现在我有一个这个消息处理程序流和一个那个消息处理程序流。我有一个接受消息类型的入口流。 为了创建拆分,我有以下分区器。我有以下分区器函数的定义。 我希望使用上述方法,并使用类型params中的ADT来初始化分区程序。 编译器抛出这个错误。 据我所知,分区对象只有Inlet(在本例中为A,参数化类型。 有人知道我该怎么解决这个问题吗?

  • 问题内容: 这是一个示例方案: 想象一下,我们有如下员工记录: 等等。目的是计算不同年龄组的平均工资(例如21至30岁之间以及31至40岁之间,依此类推)。 我想使用它来做,而我只是无法理解如何使用它来完成这项工作。我在想也许我需要定义某种元组的年龄范围。有任何想法吗? 问题答案: 以下代码将为您提供所需的内容。关键是“ Collectors”类,它支持分组。 假设工资为整数但易于转换为两倍的插图

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 走这条路对吗?如果没有,流量控制该怎么做?

  • 我们有以下架构 SQS(源)->SQS轮询器->我们的业务逻辑->Sink,它从SQS中删除消息。 这是一个akka流(我们的业务逻辑有多个阶段)。 现在我们希望通过添加HTTP服务器(而不是Akka HTTP)来扩展这个体系结构。 现在我们的服务也有了路径 我认为https://doc.akka.io/docs/akka/2.5/stream/operators/source/queue.htm