我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。
以下想法并没有向我解释:
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), member) => member
case (Success(response), member) => {
Unmarshal(response.entity).to[String] onComplete {
case Success(s) => member.city = Some(s)
case Failure(ex) => member.city = None
}
}
member
}.async // <<-- This changed the behavior to be correct, why?
根据从“CityRequestEndpoint”获取的实体的性质,您可以使用两种不同的策略:
基于流
处理这种情况的典型方法是始终假设来自源endpoint的实体可以包含N段数据,其中N段是事先不知道的。这通常是要遵循的模式,因为它在现实世界中是最通用的,因此也是“最安全的”。
val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] =
(response, user) => response match {
case Failure(_) => Source single (None -> user)
case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
}
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] =
Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser =
(byteStr, user) =>
byteStr
.map(s => EnhancedUser(user.data, s))
.getOrElse(user)
val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
Flow[(ByteString, User)] map convertByteStringToUser
val useEnhancementGraph =
userSource
.via(cityRequest)
.via(httpClient)
.via(cityByteStrFlow)
.via(cityUserFlow)
.via(processEnhancedUser)
.to(Sink foreach println)
val parallelism = 10
val timeout : FiniteDuration = ??? //you need to specify the timeout limit
val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
_ match {
case (Failure(ex), user) =>
Future successful user
case (Success(resp), user) =>
resp
.entity
.toStrict(timeout)
.map(byteStr => new EnhancedUser(user.data, byteStr))
}
val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt
学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。 我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。 我错过了什么? [1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html 示例输出 我希望看到两个计算同时进
阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。 为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的-- 现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果-- 输出中缺少第一条消息。消息似乎是在打印之前发送的。 我尝试通过使用(我在上面的代码中对此进