当前位置: 首页 > 知识库问答 >
问题:

如何在一个Actor上的同一个线程内部获取http.singleRequest(httpRequest)的Httpresponse?

公西浩
2023-03-14

我有一个使用< code>httpRequest =发送HTTP POST请求的参与者

我认为我需要以某种方式使用akka.pattern.ask,以免消息到达另一个案例HttpResponse,但保持相同的案例BidRequest,以便我可以编辑变量。

object AuctionClientActor {
  def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  var bidOffer: BidOffer = BidOffer("", 0, "")

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      val content = bidRequest.bid.toJson.toString

      val latch = new CountDownLatch(bidders.size)

      val listResponseFuture: List[Future[HttpResponse]] = bidders
        .map(bidder =>
          HttpRequest( // create the request
            HttpMethods.POST,
            uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
            entity = HttpEntity(ContentTypes.`application/json`, content)
          )
        )
        // IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
        .map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request

      listResponseFuture.foreach { response =>
        Await.result(response, 3 seconds)
        response.onComplete {
          case Success(value) => latch.countDown // println(s"response success: $value")
          case Failure(exception) =>
            println(s"response failure: $exception")
            latch.countDown
        }
      }
      latch.await(3, TimeUnit.SECONDS)
      println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
      sender() ! Some(bidOffer.content)
      bidOffer = BidOffer("", 0, "")
    case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
      log.info(s"received HttpResponse OK(200): $resp")
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        println("Got response, body: " + body.utf8String)
        val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
        // I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
        if (bidOffer.bid == 0) {
          println("new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else if (newBidOffer.bid > bidOffer.bid) {
          println("replace new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else {
          println("none")
        }
      }
    case resp@HttpResponse(code, _, _, _) =>
      log.info(s"Request failed, response code: $code")
      resp.discardEntityBytes()
  }
}

我正在查看这个答案,以将列表[未来]转换为未来[列表],但是当我这样做时,我创建了一个未来[列表[Any]],而不是HttpResponse

下一个代码段:所以我尝试按照你说的方式做,但我正在创建一个List[Future[Future[String]]]。如果我只有一个主机来做请求,这很容易。但是因为我可以有 1、2 或 3 个请求,所以我创建了一个列表,代码变得复杂。此外,来自akka-streamrunFold创造了另一个未来。你能给一个提示,如何以你说的方式实现它吗?

      val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
            .map { httpResponse =>
              println(s"response: $httpResponse")
              // this creates the second Future
              httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
                println("Got response, body: " + body.utf8String)
                // BidOfferConverter.getBidOffer(body.utf8String)
                body.utf8String
              }
            }
        }

共有2个答案

孟豪
2023-03-14

我必须让它运行。我还使用TryOptiongetOrElse,以防某些服务器出现故障。所以我仍然发回一个HttpResponse。为了完整起见,我将把答案留在这里。如果有人有更好的方法,我很乐意重新思考。

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      log.info(s"received bid request: $bidRequest")
      val content = bidRequest.bid.toJson.toString
        .replace("[[", "{")
        .replace("]]", "}")
        .replace("\",\"", "\": \"")
        .replace("[", "")
        .replace("]", "")

      val responseListFuture = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          val httpResponseFuture = http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future[HttpResponse]
          Await.ready(httpResponseFuture, 5 seconds)
          httpResponseFuture.value.get.getOrElse(HttpResponse(StatusCodes.NotFound))
        }.filter(httpResponse => httpResponse.status == StatusCodes.OK)
        .map { httpResponse =>
          println(s"response: $httpResponse")
          val bidOfferFuture = httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
            println("Got response, body: " + body.utf8String)
            BidOfferConverter.getBidOffer(body.utf8String)
          }
          Await.ready(bidOfferFuture, 5 seconds)
          bidOfferFuture.value.get.getOrElse(BidOffer("", 0, ""))
        }
      responseListFuture.foreach { bidOffer =>
        println(s"bidOffer: ${bidOffer.id}, ${bidOffer.bid}, ${bidOffer.content}")
      }
      val bidOfferWinner = responseListFuture.maxBy(_.bid)
      println(s"winner: $bidOfferWinner")
      sender() ! Some(bidOfferWinner.content)
  }
}
卜泓
2023-03-14

简短的回答是,你不能,除非在接收中阻塞,这是一个主要的禁忌。

这有一种X:Y问题的感觉。这里的实际目标是什么?只是您不希望在所有请求完成之前发送响应吗?

如果这就是你想要的,那么采取的方法是< code >映射一个未来,将它转换成一个包含你需要建立响应的信息的消息。这样做,您甚至可能不需要< code>bidOffer变量。

Future.sequenceSeq[Future[A]](以及其他集合类型)折叠为Future[Seq[A]](如果任何期货失败,则失败:这可能不是您正在寻找的,在这种情况下,Future伴随对象中的其他组合器可能更适合您正在寻找的)。

 类似资料:
  • 问题内容: 说我有 ID的* 记录 * 我希望能够通过下一个/上一个链接导航到另一个。 问题是,我不知道 如何获取具有最近的较高ID的记录 。 因此,当我有一个 ID为ID 的记录时,我需要能够提取下一个现有记录,即为。 该查询可能看起来像 如何获取下一个/上一个记录而不获取整个结果集并手动迭代? 我正在使用MySQL 5。 问题答案: 下一个: 以前的:

  • 我对高级Java和可重入锁的学习相当陌生。我知道ReentrantLock有一个公平性参数,它确保将锁提供给最缺线程(与同步的内部锁不同) 但是,Reentrant也意味着同一线程可以通过递增holdcount来一次又一次地重新获取锁。如果同一个线程一直获得锁,它如何保证公平性?

  • 问题内容: 假设我有ID为3、4、7、9的记录,并且我希望能够通过下一个/上一个链接导航到另一个。问题是,我不知道如何获取具有最近的较高ID的记录。 因此,当我有一个ID为4的记录时,我需要能够获取下一个现有记录,即7。查询可能看起来像 如何获取下一个/上一个记录而不获取整个结果集并手动进行迭代? 我正在使用MySQL 5。 问题答案: 下一个: 以前:

  • 我写了一个启动两个线程的代码片段;一个线程打印所有奇数,而另一个线程打印所有偶数。我使用了内在锁和线程通信命令的组合来实现两个线程的正确交叉。这是我的代码, 以下是我的问题: > 奇数线程在printOdd()函数中执行,而偶数线程在print偶数()函数中执行。我对两个线程都使用一个内在锁;我不明白两个线程怎么能同时存在于各自的同步块中,因为使用了相同的锁。 我从代码中删除了线程通信语句(通知,

  • 我有一个由线程a读取和更新的同步映射(通过< code > collections . synchronized Map()),线程B只能通过< code>Map.keySet()(只读)访问该映射。 我应该如何同步这个?文档中说key Set()(用于Collections.synchronized映射)“不需要在同步块中”。我可以把线程A的读/写访问放在同步块中,但这有必要吗? 我想,如果Ma

  • 我运行一个大型的minecraft服务器,minecraft服务器端是单线程的。一切都是在主游戏循环中完成的。如果Mojang使minecraft服务器端多线程化,minecraft服务器每年将节省200万美元,因为租用的硬件更少。 不管怎样,我听说过这些谣言和理论。我从来都无法用谷歌搜索并弄清楚。 有没有必要使用多核cpu,并将其转换为单核、单线程? 我一直在猜测虚拟机管理程序软件将运行多线程,