当前位置: 首页 > 知识库问答 >
问题:

在JAX-RS支持的服务器中使用reactor时,如何处理阻塞调用?

陆子石
2023-03-14

为了处理HTTP请求,我们必须将阻塞调用(例如JDBC调用)作为基于mono/flux过程的一部分。我们目前的计划如下所示:

// I renamed getSomething to processJaxrsHttpRequest
CompletionStage<String> processJaxrsHttpRequest(String input) {
  return Mono.just(input)
      .map(in -> process(in))
      .flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
      .flatMap(str -> asyncHttpCall(str))
      .flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
      .toFuture();
}

其中FixedScheduler跨HTTP请求并发使用。

我们希望得到一些关于这个策略的反馈,以便在相当数量的流量内处理块调用。当然,我们理解,如果所有的请求都经过这些阻塞调用,那么我们还不如不使用reactor(在公认不错的处理API之外)。

更新:感谢bsideup的回答。然而,我应该在我的问题上更具体一点。

我的总体问题是,如果可以大量创建/订阅多个流,如何有效地跨多个流使用阻塞调用。我们尝试了建议的方法,但它导致线程爆炸,并很快OOM。所以我们正在考虑使用一个共享调度器。所以..以下是我的问题。

  1. 正在使用共享调度程序(fixedscheduler)在我描述的情况下您会建议什么?如果没有,你能给我指个方向吗?
  2. 如果使用共享调度器很好,那么这是一个很好的实现吗:schedulers.newparallel(“blocking-scheduler”,maxNumThreads)

更新2:刚刚在调度器#newparallel上挖了一个大坑,意识到这不起作用,因为它“拒绝”阻塞调用。

真的很感激任何提示!

共有1个答案

胡彭亮
2023-03-14

虽然subscribeon确实是处理阻塞调用的一种方法,而且您的用法也可以,但也可以使用publishon
它将处理移到提供的调度程序,除非指定了其他publishon:

CompletionStage<String> getSomething(String input) {
  return Mono.just(input)
      .map(in -> process(in)) // process must be non-blocking, or go after publishOn
      .publishOn(Schedulers.boundedElastic())
      .map(::jdbcCall)
      .flatMap(str -> asyncHttpCall(str))
      .publishOn(Schedulers.boundedElastic())
      .map(::jdbcCall)
      .toFuture();
}

如您所见,您也可以继续使用异步调用。只需确保您没有阻塞非阻塞调度器(在该示例中,我在FlatMap之后再次使用Publishon,因为AsynchTtpCall可以从非阻塞调度器完成)

 类似资料:
  • 问题内容: 我正在开发一个Java脚本客户端应用程序,在服务器端我需要处理CORS,以及我用JERSEY用JAX-RS编写的所有服务。我的代码: 到目前为止,我收到错误消息在请求的资源上没有标头。因此,不允许访问源’ http:// localhost:8080 ‘。” 问题答案: 注意:请务必阅读底部的UPDATE @CrossOriginResourceSharing 是CXF批注,因此不适用

  • 我对使用jersey jetty glassfish之类的web服务器/应用程序/servlet感到困惑。还有web.xml文件?什么?ResourceConfig只接受类,而不接受它们的实例。一切似乎都是那么的一团糟。 我怎么能就这样做类似的事情呢?

  • 我正在开发一个java脚本客户端应用程序,在服务器端我需要处理CORS,所有我用JAX-RS和Jersey编写的服务。我的代码: 感谢佛普尼

  • 如何使用Spring Webflux Netty reactor从阻塞调度器(阻塞池)切换回以前的调度器(reactor http nio)? 代码: 上面我使用的是publishOn(Schedulers.parallel()),但这一个创建了新的线程池(parallel)。相反,我更喜欢切换反应器http nio线程池。 实际结果日志: 预期结果日志:

  • 问题内容: 我正在开发一个Java脚本客户端应用程序,在服务器端我需要处理CORS,以及我用JERSEY用JAX-RS编写的所有服务。我的代码: 到目前为止,我收到错误消息请求的资源上没有标头。因此,不允许访问源’ http:// localhost:8080 ‘。” 请协助我。 问题答案: 注意:请务必阅读底部的UPDATE 是CXF批注,因此不适用于Jersey。 对于Jersey,要处理CO

  • 我有一个基于strut的应用程序,我在其中调用我的Restful Web Service。我的实际服务调用如下所示: 呼叫通过只是罚款,但我想处理的情况下,如果我的服务是关闭,我想超时1分钟,而不是等待这么长时间。