Project Reactor是否可以在一个mono中等待一个事件/条件,而不需要使用每个mono的阻塞线程?使用CompletableFuture
,我可以完成这样的事情,但我不知道如何使用Project Reactor。
我的问题是我需要将请求与响应相关联。响应时间变化很大,有些甚至永远不会得到回复和超时。在客户端,每个请求阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终导致每个请求产生一个线程阻塞等待响应。
API如下所示:
Mono<Response> doRequest(Mono<Request> request);
由于我不知道如何使用Reactor来实现这一点,所以我将解释如何使用CompletableFuture
来实现这一点,以澄清我要寻找的内容。API如下所示:
CompletableFuture<Response> doRequest(Request request);
当调用方调用时,会向服务器发出一个请求,该请求中有一个由该方法生成的相关ID。调用方返回一个CompletableFuture
,该方法在映射中存储对该CompletableFuture
的引用,并将相关ID作为键。
还有一个线程(池)接收服务器的所有响应。当它收到响应时,它从响应中获取相关ID,并使用它在映射中查找原始请求(即CompletableFuture
),并对其调用Complete(response);
。
在此实现中,每个请求不需要阻塞线程。这基本上更像是一种vert.x/Netty的思维方式?我想知道如何实现这样的事情(如果可能的话)与项目Reactor。
编辑25-07-2019:
根据注释中的请求,为了澄清我在下面得到的内容,我将用completeablefuture
实现这一点。
我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture
示例中,我传递了一个mono
作为参数。这本应该只是一个“正常”的论点。我很抱歉,我希望我没有让人们太困惑。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
class NonBlockingCorrelatingExample {
/**
* This example shows how to implement correlating requests with responses without needing a (sleeping)
* thread per request to wait for the response with the use of {@link CompletableFuture}'s.
*
* So the main feat of this example is that there is always a fixed (small) number of threads used even if one
* would fire a thousands requests.
*/
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));
// The blocking call here is just so the application doesn't exit until the demo is completed.
responseFuture.get();
}
static class RequestResponseService {
/** The key in this map is the correlation ID. */
private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();
CompletableFuture<Response> doRequest(Request request) {
Response response = new Response();
response.correlationId = request.correlationId;
CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
responses.put(response.correlationId, reponseFuture);
doNonBlockingFireAndForgetRequest(request);
return reponseFuture;
}
private void doNonBlockingFireAndForgetRequest(Request request) {
// In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
// Right now we will just make a call which will simulate a response message coming in after a while.
simulateResponses();
}
private void processResponse(Response response) {
// There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
// in a response topic and calls this method to handle those messages.
CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
responseFuture.complete(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
static class Request {
long correlationId;
String question;
}
static class Response {
long correlationId;
String answer;
}
}
是的,有可能。您可以使用reactor.core.publisher.mono#create
方法来实现
对于您的示例:
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
Mono<Request> requestMono = Mono.just(request)
.doOnNext(rq -> System.out.println(rq.question));
requestResponseService.doRequest(requestMono)
.doOnNext(response -> System.out.println(response.answer))
// The blocking call here is just so the application doesn't exit until the demo is completed.
.block();
}
static class RequestResponseService {
private final ConcurrentHashMap<Long, Consumer<Response>> responses =
new ConcurrentHashMap<>();
Mono<Response> doRequest(Mono<Request> request) {
return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
.then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
}
private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
return Mono.fromRunnable(this::simulateResponses);
}
private void processResponse(Response response) {
responses.get(response.correlationId).accept(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
我遇到了一个非常奇怪的问题,java线程正忙着等待。 我有一个线程忙于等待其他线程的静态变量的状态。假设忙碌等待的线程正在等待另一个线程的静态int变量达到某个值 如果我使用上面的代码,线程将被卡在忙等待中,不会跳出while循环,即使确实达到5。 但是,如果我使用其他代码,那么线程确实会跳出忙等待循环。有时,一旦达到5,其他时候会晚一点。但它会发生。对于我的特定示例,我将其用作“无意义的工作”
问题内容: 我当前正在使用subprocess.call()来调用另一个程序,但是它将阻塞正在执行的线程,直到该程序完成。有没有一种方法可以简单地启动该程序而无需等待返回? 问题答案: 使用代替:
我在Scala中有一个要求,即运行一系列http调用,这些调用必须按顺序完成且不阻塞。我怎样才能做到这一点?
问题内容: 如果要使用Linq-SQL,还必须将DB Table拖到设计器表面以创建实体类。 我一直喜欢我的应用程序中的完全控制权,并且不喜欢dotnet创建的类。 是否可以使用我自己的数据访问层实体类在Linq和DB之间提供此连接? 我该如何完成? 问题答案: 您可以使用Linq-to-SQL非常轻松地编写自己的类-只需使用一些属性绘制类即可。 例如,这是我的一个项目中有一个非常简单的表,它可以
我的布局有表面视图。有时当我的应用程序从后台切换到前台时,我会得到ANR。我认为原因是主线程被lock方法阻塞了。 最重要的部分是:
问题内容: 我正在评估JOOQ是否可在仍在开发中的新系统中使用。我想避免在与应用程序一起开发数据库时生成代码,而只是为该应用程序起持久存储的作用。因此,预计数据库模式定义将由Java代码(java中的表定义)驱动。 JOOQ是否适合上述用例?是否有用于模式定义的Java DSL? 问题答案: JOOQ是否适合上述用例? 是的,许多jOOQ用户仅使用运行时库,而没有代码生成器。入门指南中提供了示例。