当前位置: 首页 > 知识库问答 >
问题:

是否可以用Project Reactor在不阻塞线程的情况下等待事件?

莫骞仕
2023-03-14

Project Reactor是否可以在一个mono中等待一个事件/条件,而不需要使用每个mono的阻塞线程?使用CompletableFuture,我可以完成这样的事情,但我不知道如何使用Project Reactor。

我的问题是我需要将请求与响应相关联。响应时间变化很大,有些甚至永远不会得到回复和超时。在客户端,每个请求阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终导致每个请求产生一个线程阻塞等待响应。

API如下所示:

Mono<Response> doRequest(Mono<Request> request);

由于我不知道如何使用Reactor来实现这一点,所以我将解释如何使用CompletableFuture来实现这一点,以澄清我要寻找的内容。API如下所示:

CompletableFuture<Response> doRequest(Request request);

当调用方调用时,会向服务器发出一个请求,该请求中有一个由该方法生成的相关ID。调用方返回一个CompletableFuture,该方法在映射中存储对该CompletableFuture的引用,并将相关ID作为键。

还有一个线程(池)接收服务器的所有响应。当它收到响应时,它从响应中获取相关ID,并使用它在映射中查找原始请求(即CompletableFuture),并对其调用Complete(response);

在此实现中,每个请求不需要阻塞线程。这基本上更像是一种vert.x/Netty的思维方式?我想知道如何实现这样的事情(如果可能的话)与项目Reactor。

编辑25-07-2019:

根据注释中的请求,为了澄清我在下面得到的内容,我将用completeablefuture实现这一点。

我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture示例中,我传递了一个mono作为参数。这本应该只是一个“正常”的论点。我很抱歉,我希望我没有让人们太困惑。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating requests with responses without needing a (sleeping)
     * thread per request to wait for the response with the use of {@link CompletableFuture}'s.
     *
     * So the main feat of this example is that there is always a fixed (small) number of threads used even if one
     * would fire a thousands requests.
     */
    public static void main(String[] args) throws Exception {

        RequestResponseService requestResponseService = new RequestResponseService();

        Request request = new Request();
        request.correlationId = 1;
        request.question = "Do you speak Spanish?";

        CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
        responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

        // The blocking call here is just so the application doesn't exit until the demo is completed.
        responseFuture.get();
    }

    static class RequestResponseService {

        /** The key in this map is the correlation ID. */
        private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses =  new ConcurrentHashMap<>();

        CompletableFuture<Response> doRequest(Request request) {
            Response response = new Response();
            response.correlationId = request.correlationId;
            CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
            responses.put(response.correlationId, reponseFuture);

            doNonBlockingFireAndForgetRequest(request);

            return reponseFuture;
        }

        private void doNonBlockingFireAndForgetRequest(Request request) {
            // In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
            // Right now we will just make a call which will simulate a response message coming in after a while.
            simulateResponses();
        }

        private void processResponse(Response response) {
            // There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
            // in a response topic and calls this method to handle those messages.
            CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
            responseFuture.complete(response);
        }

        void simulateResponses() {
            // This is just to make the example work. Not part of the example.
            new Thread(() -> {
                try {
                    // Simulate a delay.
                    Thread.sleep(10_000);

                    Response response = new Response();
                    response.correlationId = 1;
                    response.answer = "Si!";

                    processResponse(response);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static class Request {
        long correlationId;
        String question;
    }

    static class Response {
        long correlationId;
        String answer;
    }

}

共有1个答案

楚畅
2023-03-14

是的,有可能。您可以使用reactor.core.publisher.mono#create方法来实现

对于您的示例:

public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";


    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
    requestResponseService.doRequest(requestMono)
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.
            .block();
}

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
    }

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);
    }

    private void processResponse(Response response) {
        responses.get(response.correlationId).accept(response);
    }

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.
                Thread.sleep(10_000);

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";

                processResponse(response);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
 类似资料:
  • 我遇到了一个非常奇怪的问题,java线程正忙着等待。 我有一个线程忙于等待其他线程的静态变量的状态。假设忙碌等待的线程正在等待另一个线程的静态int变量达到某个值 如果我使用上面的代码,线程将被卡在忙等待中,不会跳出while循环,即使确实达到5。 但是,如果我使用其他代码,那么线程确实会跳出忙等待循环。有时,一旦达到5,其他时候会晚一点。但它会发生。对于我的特定示例,我将其用作“无意义的工作”

  • 问题内容: 我当前正在使用subprocess.call()来调用另一个程序,但是它将阻塞正在执行的线程,直到该程序完成。有没有一种方法可以简单地启动该程序而无需等待返回? 问题答案: 使用代替:

  • 我在Scala中有一个要求,即运行一系列http调用,这些调用必须按顺序完成且不阻塞。我怎样才能做到这一点?

  • 问题内容: 如果要使用Linq-SQL,还必须将DB Table拖到设计器表面以创建实体类。 我一直喜欢我的应用程序中的完全控制权,并且不喜欢dotnet创建的类。 是否可以使用我自己的数据访问层实体类在Linq和DB之间提供此连接? 我该如何完成? 问题答案: 您可以使用Linq-to-SQL非常轻松地编写自己的类-只需使用一些属性绘制类即可。 例如,这是我的一个项目中有一个非常简单的表,它可以

  • 我的布局有表面视图。有时当我的应用程序从后台切换到前台时,我会得到ANR。我认为原因是主线程被lock方法阻塞了。 最重要的部分是:

  • 问题内容: 我正在评估JOOQ是否可在仍在开发中的新系统中使用。我想避免在与应用程序一起开发数据库时生成代码,而只是为该应用程序起持久存储的作用。因此,预计数据库模式定义将由Java代码(java中的表定义)驱动。 JOOQ是否适合上述用例?是否有用于模式定义的Java DSL? 问题答案: JOOQ是否适合上述用例? 是的,许多jOOQ用户仅使用运行时库,而没有代码生成器。入门指南中提供了示例。