当前位置: 首页 > 知识库问答 >
问题:

Akka流在流中使用HttpResponse

羊舌富
2023-03-14

我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法:

val httpClient = Http().superPool[User]()

val cityRequest = Flow[User].map { user=>
  (HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), user) => user
  case (Success(resp), user) => {
    // << What to do here to get the value >> //
    val responseData = processResponseSomehowToGetAValue?
    val enhancedUser = new EnhancedUser(user.data, responseData)
    enhancedUser
  }
}

val processEnhancedUser = Flow[EnhancedUser].map {
  // e.g.: Asynchronously save user to a database
}

val useEnhancementGraph = userSource
  .via(getRequest)
  .via(httpClient)
  .via(getResponse)
  .via(processEnhancedUser)
  .to(Sink.foreach(println))

我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。

以下想法并没有向我解释:

    null
val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), member) => member
  case (Success(response), member) => {
    Unmarshal(response.entity).to[String] onComplete {
      case Success(s) =>  member.city = Some(s)
      case Failure(ex) => member.city = None
    }
  }
  member
}.async  // <<-- This changed the behavior to be correct, why?

共有1个答案

齐财
2023-03-14

根据从“CityRequestEndpoint”获取的实体的性质,您可以使用两种不同的策略:

基于流

处理这种情况的典型方法是始终假设来自源endpoint的实体可以包含N段数据,其中N段是事先不知道的。这通常是要遵循的模式,因为它在现实世界中是最通用的,因此也是“最安全的”。

val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] = 
  (response, user) => response match {
    case Failure(_) => Source single (None -> user)
    case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
  }
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] = 
  Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser = 
  (byteStr, user) => 
    byteStr
      .map(s => EnhancedUser(user.data, s))
      .getOrElse(user)

val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] = 
  Flow[(ByteString, User)] map convertByteStringToUser
val useEnhancementGraph =
  userSource
    .via(cityRequest)
    .via(httpClient)
    .via(cityByteStrFlow)
    .via(cityUserFlow)
    .via(processEnhancedUser)
    .to(Sink foreach println)
    null
val parallelism = 10

val timeout : FiniteDuration = ??? //you need to specify the timeout limit

val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] = 
  _ match {
    case (Failure(ex), user)   => 
      Future successful user
    case (Success(resp), user) => 
      resp
        .entity
        .toStrict(timeout)
        .map(byteStr => new EnhancedUser(user.data, byteStr))
  }    

val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
  Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
 类似资料:
  • 我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt

  • 学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。 我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。 我错过了什么? [1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html 示例输出 我希望看到两个计算同时进

  • 阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。 为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的-- 现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果-- 输出中缺少第一条消息。消息似乎是在打印之前发送的。 我尝试通过使用(我在上面的代码中对此进