当前位置: 首页 > 知识库问答 >
问题:

向涉及 akka 的异步服务器发送请求

方俊
2023-03-14

我想向一个包含参与者的服务器发出一个异步请求。假设我有两个演员:

 class SessionRetriever extends Actor {
  import SessionRetriever._

  def receiver = {
    Get => 
      val s = getSessionIdFromServer() // 1
      sender ! Result(s)               // 2
  }

  def getSessionIdFromServer(): String = { ... } // 3
}

object SessionRetriever {
  object Get
  object Result(s: String)
}

class RequestSender extends Actor {
  val sActor = context actorOf Props[SessionRetriever]

  def receiver = {
    // get session id
    val sesId = sActor ! SessionRetriever.Get 
    val res = sendRequestToServer(sesId)      
    logToFile(res)                            

    context shutdown sActor       
  }


  def sendRequestToServer(sessionId: String): String = { .... } 
}

我的问题是:

val s = getSessionIdFromServer() // 1
sender ! Result(s)               // 2

1) getSessionIdFromServer()向服务器发出同步请求。我认为异步请求会好得多,对吗?所以它将返回Future[String]而不是一个普通的字符串。

2)我如何使异步:通过使用AsyncHttpClient(如果我没记错的话)或将其同步体包装到Future { } 中?

3)我应该使用阻塞{}块吗?如果是,那么确切的位置:在它的主体内部还是这里val s=阻塞{getSessionIdFromServer()}

附言:在这一点上,我不想使用异步{}等待{},因为它们是相当高级的函数,毕竟它们是建立在Futures之上的

共有3个答案

沈俊晤
2023-03-14

要使用AsyncHttpClient进行异步调用,您可以通过scalapromise处理java的未来。

    import scala.concurrent.Future
    import com.ning.http.client.AsyncHttpClient
    import scala.concurrent.Promise
    import java.util.concurrent.Executor

    object WebClient {

      private val client = new AsyncHttpClient

      case class BadStatus(status: Int) extends RuntimeException

      def get(url: String)(implicit exec: Executor): Future[String] = {
        val f = client.prepareGet(url).execute();
        val p = Promise[String]()
        f.addListener(new Runnable {
          def run = {
            val response = f.get
            if (response.getStatusCode / 100 < 4)
              p.success(response.getResponseBodyExcerpt(131072))
            else p.failure(BadStatus(response.getStatusCode))
          }
        }, exec)
        p.future
      }

      def shutdown(): Unit = client.close()
 }

 object WebClientTest extends App {
   import scala.concurrent.ExecutionContext.Implicits.global
   WebClient get "http://www.google.com/" map println foreach (_ => WebClient.shutdown())
 }

然后通过回调处理未来的完成。

代码归功于Coursera令人敬畏的反应式编程课程。

西门山
2023-03-14

1) 如果getSessionIdFromServer()被阻塞,那么您应该从接收函数异步执行它,否则您的参与者将在每次接收新请求时阻塞,并将始终等到接收到新会话后再处理下一个请求。

2) 使用 Future 会将阻塞操作“移动”到不同的线程。因此,您的Actor不会被阻止,并且能够继续处理传入的请求 - 这很好 - 但是您仍然阻塞了线程 - 不是那么好。使用 AsyncHttpClient 是一个好主意。您可以探索其他非阻塞httpClient,例如PlayWebService。

3)我对< code>blocking不太熟悉,所以不确定我是否应该在这里给出任何建议。据我所知,它将告诉线程池操作正在阻塞,它应该产生一个临时的新线程来处理它——这避免了所有的工作线程都被阻塞。同样,如果您这样做,您的actor将不会被阻塞,但是您在从服务器获取会话时仍然阻塞了一个线程。

总结一下:如果可能的话,只需在getSessionIdFromServer中使用异步http客户端。否则,请使用 Future{}block

宓和同
2023-03-14

你可以试试这种非阻塞的方式

def receive = {
    Get => 
      //assume getSessionIdFromServer() run aysnchronize
      val f: Future[String] = getSessionIdFromServer()
      val client = sender //keep it local to use when future back
      f onComplete {
        case Success(rep) => client ! Result(rep)
        case Failure(ex) => client ! Failed(ex)
      }
 }
 类似资料:
  • XMLHttpRequest 对象用于和服务器交换数据。 向服务器发送请求 如需将请求发送到服务器,我们使用 XMLHttpRequest 对象的 open() 和 send() 方法:xmlhttp.open("GET","ajax_info.txt",true); xmlhttp.send(); 方法 描述 open(method,url,async) 规定请求的类型、URL 以及是否异步处理

  • 我有一个BE服务a,它正在使用假客户端向microservice B发送Rest JSON消息: 终点: Rest Endpoint正在向AWS Ses邮件或其他邮件提供商发送邮件。 问题是来自飞格的第一个呼叫可能需要5秒或更长时间。我需要使其异步,以便FE客户端不要等待邮件发送。 我如何可以使从飞度异步发出的Rest调用到超文本传输协议响应OK没有等待时间可以预期?是否有一些更好的解决方案来实现

  • 在行引发异常: 线程“main”javax.net.ssl.SSLHandShaker异常:Sun.Security.Validator.ValidatoreXception:PKIX路径构建失败:Sun.Security.Provider.CertPath.SunCertPathBuilderException:无法在Sun.Security.SSL.Alerts.GetSleXception(

  • 我最近一直在用Java编写自己的Webserver,因为我觉得有一个很不错,昨天我偶然发现了一个问题,我仍然没有解决。我的浏览器(取消搜索Chromium)似乎向服务器发送了一些空请求或类似的东西。我实现了一个请求处理程序,它应该读取GET请求并提取请求的资源。它的工作原理是这样的:它以请求为例:“GET/index.htmlHTTP/1.1”并将其放入带有String.split (" ");

  • 请求将被发送1个请求/秒,还是所有请求将被一次发送到服务器?