当前位置: 首页 > 知识库问答 >
问题:

如何构建异步静止endpoint,在工作线程中调用阻塞操作并立即回复(Quarkus)

马魁
2023-03-14

我查看了文档和stackoverflow,但没有找到合适的方法。E、 g.这篇文章似乎非常接近:在Quarkus/Mutiny的反应式REST GETendpoint中调度阻塞服务。然而,我不希望在我的服务中有这么多不必要的样板代码,最多也不要更改任何服务代码。

我通常只想调用一个使用实体管理器的服务方法,因此是一个阻塞操作,但是,我想立即向调用者返回一个字符串,就像“查询开始”之类的。我不需要回调对象,它只是一种火了就忘记的方法。

我试过这样的

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}

但它不起作用-

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",

我实际上期望qukus注意相应地分配任务,即对io线程的rest调用和对工作线程的阻塞实体管理器操作。所以我一定是用错了。

更新:

还尝试了我在https://github.com/quarkusio/quarkus/issues/11535将方法正文更改为

return Uni.createFrom()
        .item("query started")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(()-> service.startLongRunningQuery());

现在我没有得到一个错误,但service.startLongRunningQuery()没有被调用,因此没有日志和查询实际上发送到数据库。

与(如何使用Mutiny反应式编程调用长时间运行的阻塞无效返回方法?):

return Uni.createFrom()
            .item(() ->service.startLongRunningQuery()) 
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

与(如何在另一个线程上运行阻塞代码并立即返回超文本传输协议请求)相同:

ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

return Uni.createFrom()
                .item(() -> service.startLongRunningQuery())
                .runSubscriptionOn(executor);

知道为什么要服务吗。假设rest调用通过IO线程处理,服务调用由工作线程处理,则根本不调用startongrunningquery(),如何实现触发和遗忘行为?

共有2个答案

乐城
2023-03-14

使用EventBus进行https://quarkus.io/guides/reactive-event-bus

发送和忘记是一条路要走。

孟修竹
2023-03-14

这取决于您是要立即返回(在有效执行startongrunningquery操作之前),还是要等待操作完成。

如果是第一种情况,请使用以下内容:

@Inject EventBus bus;

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public void triggerQuery() {
    bus.send("some-address", "my payload");
}

@Blocking // Will be called on a worker thread
@ConsumeEvent("some-address")
public void executeQuery(String payload) {
    service.startLongRunningQuery();
}

在第二种情况下,您需要在工作线程上执行查询。

@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
   return Uni.createFrom(() -> service.startLongRunningQuery())
      .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

请注意,您需要RESTEasy反应式才能工作(而不是经典的RESTEasy)。如果您使用经典的RESTEasy,您将需要quarkus RESTEasy mutiny扩展(但我建议使用RESTEasy Responsive,它将更加有效)。

 类似资料:
  • 从进程调度谈起 现代操作系统(如 Windows、Linux 等)都是分时系统。分时系统允许同时允许多个任务,但实际上,由于一台计算机通常只有一个 CPU,所以不可能真正地同时运行多个任务。这些进程实际上是轮番运行,每个进程运行一个时间片。由于时间片通常很短,用户不会感觉到,所以这些进程看起来就像是同时运行。 每个进程的时间片由操作系统完成初始化,所有进程轮番地执行相应的时间。具体下一个时间片轮到

  • 我有4-5个工作线程处理大型消息队列。我还有另一段代码,它使用2-3个worker运行。我想在处理大型消息队列时阻止所有其他工作者。 我正在使用JDK6和Jms 编辑: 队列进程工作者从未终止。当没有消息时,它们阻塞队列。这些工作者由执行器线程池管理,如果我使用读写锁,其中一个工作者也会被阻塞。此外,如果使用循环屏障,那么我必须终止线程,以便重新传递阻塞的第二个进程。由于工作者是由线程池管理的,所

  • 问题内容: 我有一个执行以下代码的线程: 哪里是。如何才能优雅地停止此类线程?关闭或使用均无效。 问题答案: 这是因为读取System.in(InputStream)是一项阻塞操作。 在这里看是否可以从InputStream读取超时? 这是一种从System.in获取NIO FileChannel并使用超时检查数据可用性的方法,这是问题中所述问题的特例。在控制台上运行它,不要键入任何输入,然后等待

  • OpenResty 的诞生,一直对外宣传是同步非阻塞(100% non-blocking)的。基于事件通知的 Nginx 给我们带来了足够强悍的高并发支持,但是也对我们的编码有特殊要求。这个特殊要求就是我们的代码,也必须是非阻塞的。如果你的服务端编程生涯一开始就是从异步框架开始的,恭喜你了。但如果你的编程生涯是从同步框架过来的,而且又是刚刚开始深入了解异步框架,那你就要小心了。 Nginx 为了减

  • 我有一个vert。x标准Verticle基本上,它解析HttpRequest并准备JsonObject,然后我通过事件总线发送JsonObject。在另一个Worker verticale中,该事件被消耗,并将启动执行(包括对Penthao数据集成Java API的调用),它正在阻止API。完成“.kjb”文件的执行大约需要30分钟。但是vert。x不断警告Worker线程块,所以我的问题是ver

  • 这一节解释 BlockingObservable 的子类. 一个阻塞的Observable 继承普通的Observable类,增加了一些可用于阻塞Observable发射的数据的操作符。 要将普通的Observable 转换为 BlockingObservable,可以使用 Observable.toBlocking( )) 方法或者BlockingObservable.from( )) 方法。