当前位置: 首页 > 知识库问答 >
问题:

垂直。x:如何使用阻塞操作处理HttpRequest

阎宾实
2023-03-14

我刚从Vert开始。并且想了解在处理REST HttpRequest时,处理潜在长(阻塞)操作的正确方法是什么。该应用程序本身就是一个Spring应用程序。

到目前为止,我有一个简化的REST服务:

public class MainApp { 
   // instantiated by Spring
   private AlertsRestService alertsRestService;

   @PostConstruct
   public void init() {
       Vertx.vertx().deployVerticle(alertsRestService);
   }
}

public class AlertsRestService extends AbstractVerticle {
  // instantiated by Spring
    private PostgresService pgService;
    @Value("${rest.endpoint.port:8080}")
    private int restEndpointPort;

    @Override
    public void start(Future<Void> futureStartResult) {
        HttpServer server = vertx.createHttpServer();
        Router router = Router.router(vertx);
        //enable reading of the request body for all routes 
        router.route().handler(BodyHandler.create());

        router.route(HttpMethod.GET, "/allDefinitions")
              .handler(this::handleGetAllDefinitions);

        server.requestHandler(router)
            .listen(restEndpointPort, 
                result -> {
                    if (result.succeeded()) {
                        futureStartResult.complete();
                    } else {
                futureStartResult.fail(result.cause());
                    }
                }
            );
    }

  private void handleGetAllDefinitions( RoutingContext routingContext) {
        HttpServerResponse response = routingContext.response();
        Collection<AlertDefinition> allDefinitions = null;
        try {
            allDefinitions = pgService.getAllDefinitions();
        } catch (Exception e) {
            response.setStatusCode(500).end(e.getMessage());
        }           
        response.putHeader("content-type", "application/json")
            .setStatusCode(200)
            .end(Json.encodePrettily(allAlertDefinitions));
    }

}

Spring配置:

    <bean id="alertsRestService" class="com.my.AlertsRestService"
      p:pgService-ref="postgresService"
      p:restEndpointPort="${rest.endpoint.port}"
    />
    <bean id="mainApp" class="com.my.MainApp"
      p:alertsRestService-ref="alertsRestService"
    />

现在的问题是:如何正确处理(阻止)调用我的postgresService,这可能需要更长的时间,如果有很多项目得到/返回?

在研究和查看一些示例后,我看到了一些方法,但我不完全理解它们之间的区别:

备选案文1。将my AlertsRestService转换为工作线程垂直并使用工作线程池:

public class MainApp { 
   private AlertsRestService alertsRestService;
        
   @PostConstruct
   public void init() {
       DeploymentOptions options = new DeploymentOptions().setWorker(true);
       Vertx.vertx().deployVerticle(alertsRestService, options);
   }
}

这里让我困惑的是Vert. x文档中的这句话:“Worker顶点实例永远不会由Vert. x由多个线程同时执行,但是[可以在不同的时间由不同的线程执行”

这是否意味着对我的警报的所有HTTP请求将被有效地限制,以便一次由一个线程顺序执行?这不是我想要的:这个服务纯粹是无状态的,应该能够很好地处理并发请求......

所以,也许我需要看看下一个选择:

选项2.通过执行类似于文档中的示例的操作,将我的服务转换为多线程的Worker版本:

public class MainApp { 
   private AlertsRestService alertsRestService;
        
   @PostConstruct
   public void init() {
     DeploymentOptions options = new DeploymentOptions()
	  	.setWorker(true)
	  	.setInstances(5) // matches the worker pool size below
	  	.setWorkerPoolName("the-specific-pool")
	  	.setWorkerPoolSize(5);
     Vertx.vertx().deployVerticle(alertsRestService, options);
   }
}

那么,在这个例子中,到底会发生什么?据我所知,“.setInstances(5)”指令意味着将创建我的“alertsRestService”的5个实例。我将此服务配置为Springbean,其依赖项由Spring框架连接。然而,在本例中,我认为这5个实例不是由Spring创建的,而是由Vert创建的。这是真的吗?我怎样才能改成使用Spring呢?

备选案文3。使用“blockingHandler”进行布线。代码中唯一的更改将出现在AlertsRestService中。我如何定义路由器的处理程序中的start()方法:

boolean ordered = false;
router.route(HttpMethod.GET, "/allDefinitions")
	.blockingHandler(this::handleGetAllDefinitions, ordered);

据我所知,将'ordered'参数设置为TRUE意味着可以并发调用处理程序。这是否意味着此选项等同于带有多线程辅助垂直线的选项#2?有什么区别?异步多线程执行是否只涉及一个特定的HTTP请求(用于/allDefinitions路径的请求),而不是整个AlertsRestService垂直路径?

备选案文4。我发现的最后一个选项是显式地使用“executeBlocking()”指令,仅在工作线程中运行封闭的代码。我找不到很多关于如何处理HTTP请求的例子,所以下面是我的尝试——可能不正确。这里的区别仅在于handler方法handleGetAllAlertDefinitions()的实现-但它涉及到很多方面…:

private void handleGetAllAlertDefinitions(RoutingContext routingContext)    {
  vertx.executeBlocking(
      fut -> { fut.complete( sendAsyncRequestToDB(routingContext)); },
      false,
      res -> { handleAsyncResponse(res, routingContext); }
  );
}

public Collection<AlertDefinition> sendAsyncRequestToDB(RoutingContext routingContext) {
  Collection<AlertDefinition> allAlertDefinitions = new LinkedList<>();
  try {
      alertDefinitionsDao.getAllAlertDefinitions();
  } catch (Exception e) {
      routingContext.response().setStatusCode(500)
        .end(e.getMessage());
  }
  return allAlertDefinitions;
}

private void handleAsyncResponse(AsyncResult<Object> asyncResult, RoutingContext routingContext){
  if(asyncResult.succeeded()){
      try { 
          routingContext.response().putHeader("content-type", "application/json")
            .setStatusCode(200)
            .end(Json.encodePrettily(asyncResult.result()));
      } catch(EncodeException e) {
           routingContext.response().setStatusCode(500)
             .end(e.getMessage());
      }
   } else {
      routingContext.response().setStatusCode(500)
        .end(asyncResult.cause());
   }
}

这与其他选项有何不同?选项4提供处理程序的并发执行还是像选项1中那样的单线程执行?

最后,回到最初的问题:在处理REST请求时,处理长时间运行的操作最合适的选项是什么?

抱歉这么长的帖子......:)

非常感谢。

共有1个答案

巢皓君
2023-03-14

这是一个大问题,我不确定我是否能够完全解决它。但让我们试试:

在选项#1中,它的实际含义是,如果您使用多个相同类型的辅助程序,则不应在辅助程序垂直目录中使用ThreadLocal。仅使用一个辅助进程意味着您的请求将被序列化。

选项2完全不正确。不能对类的实例使用setInstances,只能对其名称使用。不过,若您选择使用类的名称Vert,那个么您是正确的。x将实例化它们。

选项#3的并发性不如使用worker,因此不应使用。

选项#4executeblock基本上就是做选项#3,而且也很糟糕。

 类似资料:
  • 我刚刚开始学习vert. x,对它是如何工作的仍然感兴趣...我在想: 当反应性非阻塞垂直线卸载到阻塞垂直线时,阻塞垂直线在工作线程上运行。 1.what事件循环线程,关闭加载的工作,在同一时间?从循环中获取另一个事件? 返回a结果时,同一事件循环是否继续执行?还是另一个? 1的答案是什么 我觉得我错过了一些基本的东西,关于事情实际上是如何工作的。谢谢你的帮助!

  • 我有一个vert。x标准Verticle基本上,它解析HttpRequest并准备JsonObject,然后我通过事件总线发送JsonObject。在另一个Worker verticale中,该事件被消耗,并将启动执行(包括对Penthao数据集成Java API的调用),它正在阻止API。完成“.kjb”文件的执行大约需要30分钟。但是vert。x不断警告Worker线程块,所以我的问题是ver

  • OpenResty 的诞生,一直对外宣传是同步非阻塞(100% non-blocking)的。基于事件通知的 Nginx 给我们带来了足够强悍的高并发支持,但是也对我们的编码有特殊要求。这个特殊要求就是我们的代码,也必须是非阻塞的。如果你的服务端编程生涯一开始就是从异步框架开始的,恭喜你了。但如果你的编程生涯是从同步框架过来的,而且又是刚刚开始深入了解异步框架,那你就要小心了。 Nginx 为了减

  • 这一节解释 BlockingObservable 的子类. 一个阻塞的Observable 继承普通的Observable类,增加了一些可用于阻塞Observable发射的数据的操作符。 要将普通的Observable 转换为 BlockingObservable,可以使用 Observable.toBlocking( )) 方法或者BlockingObservable.from( )) 方法。

  • 我对Vert.x非常陌生,就像几天前一样。我来自一个JAX式的,安逸的世界。我可能大错特错,请指正。 我的问题是:如何使顶点公开自己的REST接口(子路由器),以及如何将其子路由器注册到应用程序的主路由器中? 我尝试过类似的东西,但是当我请求/产品/所有:( } }

  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间