当前位置: 首页 > 知识库问答 >
问题:

如何用多重反应式编程调用长时间运行的阻塞空位返回方法?

施权
2023-03-14

我在Mutiny的Uni上有一系列异步和同步方法调用,有些方法是一个长期运行的进程,返回类型为val

在不阻塞下游的情况下调用/调用它们的正确方式是什么?

下面是简单的类比代码。

class Root {

    public static void main(String[] args) {

        final Response response = getResponsePayload(); // Gets the the Payload from upstream service
        Uni.createFrom().item(response)
            .onItem().invoke(() -> System.out.println("Process Started"))
            .onItem().call(res -> {
            longRunningMethodAsync(res);    // long running blocking method, I want to run on a worker thread
            return Uni.createFrom().voidItem(); // This line I created, because of the ppipeline will be broken if the Uni is not returned from here
        })
            .onItem().transform(item -> item.hello + " mutiny")
            .onItem().transform(String::toUpperCase)
            .subscribe().with(
            item -> System.out.println(">> " + item));  // This is printed to the console
    }



    // Boilerplate method - I created to invoke/call the actual method actual method - `longRunningMethod`, this method basically an adapter
    // This is the best apprach I could come up, but I'm looking for better thatn this as I'm not conviced I'm doing it right
    private static UniSubscribe<Void> longRunningMethodAsync(final Response response) {

        
        return Uni.createFrom().voidItem().invoke(() -> longRunningMethod(response))
            .runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe();
    }


    // Important - this is the method I want to run asynchronously independently of main *event-loop* thread.
    private static void longRunningMethod(final Response response) {
  
        System.out.println("Long running process started"); // Doesn't get printed, which means this is never called at all, not even in the blocked manner by the main even-loop thread
    }




   // Not as importatnt, I provded this in case if you like to run on your local box
    private static Response getResponsePayload() {
        return new Response();
    }

    private static class Response {
        public final String hello = "hello";
    }
}

共有2个答案

池恩
2023-03-14

尝试:

private static UniSubscribe<Void> longRunningMethodAsync(final Response response) {
            
   return Uni.createFrom()
  .voidItem()
  .invoke(() -> longRunningMethod(response))
  .runSubscriptionOn(Infrastructure.getDefaultExecutor())
  .subscribeAsCompletionStage().join();
}
齐修贤
2023-03-14

通常,使用runSubscriptionOn并传递特定的执行器:

longRunningMethodAsync
   .runSubscriptionOn(executor);

请注意,它会将并发限制为执行程序中可用的线程数。

参考号:

  • https://smallrye.io/smallrye-mutiny/guides/emit-subscription
  • https://smallrye.io/smallrye-mutiny/guides/emission-threads
 类似资料:
  • 本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者

  • 我需要在后台调用一个调用webservice的API。我不想将(非常复杂的)方法转换为异步方法,只想说“在后台完成所有这些”。 但是我迷失在如何用F#做到这一点。这就是我所拥有的:

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 我想在另一个Flux流中间调用Mono,向mono发送Flux参数。我正在使用SpringBoot的WebClient。 我试过这个: 但是,如何将第一个API调用的返回发送给param2?然后得到双方的回应?第一个API返回许多值,对于每个值,我需要调用第二个API。 谢谢

  • 我有一些微服务,应该可以在WebFlux框架上运行。每台服务器都有自己的带有Mono或Flux的API。我们使用的是Spring支持的MongoDB(Spring数据MongoDB)。 问题是外部阻塞API,我必须在我的系统中使用它。 我有一个解决办法。我可以在专用线程池中封装阻塞API调用,并在CompletableFuture中使用它。 还有别的办法解决我的问题吗?我想,那个全新的Rsocke

  • 在我的用例中,我有一个带有Reactor Netty的Spring Webflux微服务,我有以下依赖项: null 由于(版本0.7.6版)已包含在最新的(版本2.0.1版)中,因此不能再使用:,请参阅->https://github.com/reactor/reactor-netty/issues/312 我的代码段: 这适用于版本2.0.0.release,但由于升级到版本2.0.1.rel