我刚从Vert开始。并且想了解在处理REST HttpRequest时,处理潜在长(阻塞)操作的正确方法是什么。该应用程序本身就是一个Spring应用程序。
到目前为止,我有一个简化的REST服务:
public class MainApp {
// instantiated by Spring
private AlertsRestService alertsRestService;
@PostConstruct
public void init() {
Vertx.vertx().deployVerticle(alertsRestService);
}
}
public class AlertsRestService extends AbstractVerticle {
// instantiated by Spring
private PostgresService pgService;
@Value("${rest.endpoint.port:8080}")
private int restEndpointPort;
@Override
public void start(Future<Void> futureStartResult) {
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
//enable reading of the request body for all routes
router.route().handler(BodyHandler.create());
router.route(HttpMethod.GET, "/allDefinitions")
.handler(this::handleGetAllDefinitions);
server.requestHandler(router)
.listen(restEndpointPort,
result -> {
if (result.succeeded()) {
futureStartResult.complete();
} else {
futureStartResult.fail(result.cause());
}
}
);
}
private void handleGetAllDefinitions( RoutingContext routingContext) {
HttpServerResponse response = routingContext.response();
Collection<AlertDefinition> allDefinitions = null;
try {
allDefinitions = pgService.getAllDefinitions();
} catch (Exception e) {
response.setStatusCode(500).end(e.getMessage());
}
response.putHeader("content-type", "application/json")
.setStatusCode(200)
.end(Json.encodePrettily(allAlertDefinitions));
}
}
Spring配置:
<bean id="alertsRestService" class="com.my.AlertsRestService"
p:pgService-ref="postgresService"
p:restEndpointPort="${rest.endpoint.port}"
/>
<bean id="mainApp" class="com.my.MainApp"
p:alertsRestService-ref="alertsRestService"
/>
现在的问题是:如何正确处理(阻止)调用我的postgresService,这可能需要更长的时间,如果有很多项目得到/返回?
在研究和查看一些示例后,我看到了一些方法,但我不完全理解它们之间的区别:
备选案文1。将my AlertsRestService转换为工作线程垂直并使用工作线程池:
public class MainApp {
private AlertsRestService alertsRestService;
@PostConstruct
public void init() {
DeploymentOptions options = new DeploymentOptions().setWorker(true);
Vertx.vertx().deployVerticle(alertsRestService, options);
}
}
这里让我困惑的是Vert. x文档中的这句话:“Worker顶点实例永远不会由Vert. x由多个线程同时执行,但是[可以在不同的时间由不同的线程执行”
这是否意味着对我的警报的所有HTTP请求将被有效地限制,以便一次由一个线程顺序执行?这不是我想要的:这个服务纯粹是无状态的,应该能够很好地处理并发请求......
所以,也许我需要看看下一个选择:
选项2.通过执行类似于文档中的示例的操作,将我的服务转换为多线程的Worker版本:
public class MainApp {
private AlertsRestService alertsRestService;
@PostConstruct
public void init() {
DeploymentOptions options = new DeploymentOptions()
.setWorker(true)
.setInstances(5) // matches the worker pool size below
.setWorkerPoolName("the-specific-pool")
.setWorkerPoolSize(5);
Vertx.vertx().deployVerticle(alertsRestService, options);
}
}
那么,在这个例子中,到底会发生什么?据我所知,“.setInstances(5)”指令意味着将创建我的“alertsRestService”的5个实例。我将此服务配置为Springbean,其依赖项由Spring框架连接。然而,在本例中,我认为这5个实例不是由Spring创建的,而是由Vert创建的。这是真的吗?我怎样才能改成使用Spring呢?
备选案文3。使用“blockingHandler”进行布线。代码中唯一的更改将出现在AlertsRestService中。我如何定义路由器的处理程序中的start()方法:
boolean ordered = false;
router.route(HttpMethod.GET, "/allDefinitions")
.blockingHandler(this::handleGetAllDefinitions, ordered);
据我所知,将'ordered'参数设置为TRUE意味着可以并发调用处理程序。这是否意味着此选项等同于带有多线程辅助垂直线的选项#2?有什么区别?异步多线程执行是否只涉及一个特定的HTTP请求(用于/allDefinitions路径的请求),而不是整个AlertsRestService垂直路径?
备选案文4。我发现的最后一个选项是显式地使用“executeBlocking()”指令,仅在工作线程中运行封闭的代码。我找不到很多关于如何处理HTTP请求的例子,所以下面是我的尝试——可能不正确。这里的区别仅在于handler方法handleGetAllAlertDefinitions()的实现-但它涉及到很多方面…:
private void handleGetAllAlertDefinitions(RoutingContext routingContext) {
vertx.executeBlocking(
fut -> { fut.complete( sendAsyncRequestToDB(routingContext)); },
false,
res -> { handleAsyncResponse(res, routingContext); }
);
}
public Collection<AlertDefinition> sendAsyncRequestToDB(RoutingContext routingContext) {
Collection<AlertDefinition> allAlertDefinitions = new LinkedList<>();
try {
alertDefinitionsDao.getAllAlertDefinitions();
} catch (Exception e) {
routingContext.response().setStatusCode(500)
.end(e.getMessage());
}
return allAlertDefinitions;
}
private void handleAsyncResponse(AsyncResult<Object> asyncResult, RoutingContext routingContext){
if(asyncResult.succeeded()){
try {
routingContext.response().putHeader("content-type", "application/json")
.setStatusCode(200)
.end(Json.encodePrettily(asyncResult.result()));
} catch(EncodeException e) {
routingContext.response().setStatusCode(500)
.end(e.getMessage());
}
} else {
routingContext.response().setStatusCode(500)
.end(asyncResult.cause());
}
}
这与其他选项有何不同?选项4提供处理程序的并发执行还是像选项1中那样的单线程执行?
最后,回到最初的问题:在处理REST请求时,处理长时间运行的操作最合适的选项是什么?
抱歉这么长的帖子......:)
非常感谢。
这是一个大问题,我不确定我是否能够完全解决它。但让我们试试:
在选项#1中,它的实际含义是,如果您使用多个相同类型的辅助程序,则不应在辅助程序垂直目录中使用ThreadLocal。仅使用一个辅助进程意味着您的请求将被序列化。
选项2完全不正确。不能对类的实例使用setInstances
,只能对其名称使用。不过,若您选择使用类的名称Vert,那个么您是正确的。x将实例化它们。
选项#3的并发性不如使用worker,因此不应使用。
选项#4executeblock
基本上就是做选项#3,而且也很糟糕。
我刚刚开始学习vert. x,对它是如何工作的仍然感兴趣...我在想: 当反应性非阻塞垂直线卸载到阻塞垂直线时,阻塞垂直线在工作线程上运行。 1.what事件循环线程,关闭加载的工作,在同一时间?从循环中获取另一个事件? 返回a结果时,同一事件循环是否继续执行?还是另一个? 1的答案是什么 我觉得我错过了一些基本的东西,关于事情实际上是如何工作的。谢谢你的帮助!
我有一个vert。x标准Verticle基本上,它解析HttpRequest并准备JsonObject,然后我通过事件总线发送JsonObject。在另一个Worker verticale中,该事件被消耗,并将启动执行(包括对Penthao数据集成Java API的调用),它正在阻止API。完成“.kjb”文件的执行大约需要30分钟。但是vert。x不断警告Worker线程块,所以我的问题是ver
OpenResty 的诞生,一直对外宣传是同步非阻塞(100% non-blocking)的。基于事件通知的 Nginx 给我们带来了足够强悍的高并发支持,但是也对我们的编码有特殊要求。这个特殊要求就是我们的代码,也必须是非阻塞的。如果你的服务端编程生涯一开始就是从异步框架开始的,恭喜你了。但如果你的编程生涯是从同步框架过来的,而且又是刚刚开始深入了解异步框架,那你就要小心了。 Nginx 为了减
这一节解释 BlockingObservable 的子类. 一个阻塞的Observable 继承普通的Observable类,增加了一些可用于阻塞Observable发射的数据的操作符。 要将普通的Observable 转换为 BlockingObservable,可以使用 Observable.toBlocking( )) 方法或者BlockingObservable.from( )) 方法。
我对Vert.x非常陌生,就像几天前一样。我来自一个JAX式的,安逸的世界。我可能大错特错,请指正。 我的问题是:如何使顶点公开自己的REST接口(子路由器),以及如何将其子路由器注册到应用程序的主路由器中? 我尝试过类似的东西,但是当我请求/产品/所有:( } }
如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间