当前位置: 首页 > 知识库问答 >
问题:

单声道vs复杂未来

封鸿雪
2023-03-14

CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在CompletableFuture中有一个API调用。那是API调用阻塞吗?线程会被阻塞,直到它没有得到API的响应吗?(我知道主线程/tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)

据我所知,单声道是完全无阻塞的。

请阐明这一点,如果我错了,请纠正我。

共有2个答案

蔚和安
2023-03-14

基于Oleh的回答,CompletableFuture可能的懒惰解决方案是

public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    dispatch.thenAccept(x -> x.submit(() -> {
        myAsyncHttpClient.execute(someData, (result, exception -> {
            if(exception != null) {
                uncompletedFuture.completeExceptionally(exception);
                return;
            }
            uncompletedFuture.complete(result);
        })
    }));

    return uncompletedFuture;
}

然后,以后你就简单地做

dispatch.complete(executor);

这将使CompletableFuture等效于Mono,但我想没有背压。

淳于涛
2023-03-14

CompletableFuture的一个特点是它是真正异步的,它允许您从调用线程异步运行任务,而API(如thenXXX)允许您在结果可用时处理它。另一方面,CompletableFuture并不总是非阻塞的。例如,当您运行以下代码时,它将在默认的ForkJoinPool上异步执行:

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

很明显,执行任务的ForkJoinPool中的Thread最终会被阻塞,这意味着我们不能保证调用是非阻塞的。

另一方面,CompletableFuture公开了API,允许您使其真正无阻塞。

例如,您可以始终执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

如您所见,CompletableFuturefuture的API为您提供了completecompleteeexceptional方法,它们可以在需要时完成执行,而不阻塞任何线程。

在上一节中,我们对CF行为进行了概述,但是CompletableFuture和Mono之间的主要区别是什么?

值得一提的是,我们也可以做单声道屏蔽。没有人阻止我们写以下内容:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

当然,一旦我们订阅了未来,调用者线程就会被阻止。但是我们总是可以通过提供一个额外的subbeOn运算符来解决这个问题。尽管如此,Mono的更广泛的API并不是关键功能。

为了理解CompletableFutureMono之间的主要区别,让我们回到前面提到的myNonBlockingHttpCall方法实现。

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

CompletableFuture的情况下,一旦调用该方法,它就会急切地执行对另一个服务/资源的HTTP调用。即使我们在验证了一些前/后条件后并不真正需要执行的结果,它也会开始执行,并且会为这项工作分配额外的CPU/DB-Connections/What-Ever-Machine-Resources。

相比之下,Mono类型根据定义是惰性的:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

在这种情况下,在订阅最终的mono之前不会发生任何事情。因此,只有当mynonblockingTTPCallWithMono方法返回的Mono被订阅时,才会向Mono提供逻辑。创建(消费者)将被执行。

我们可以走得更远。我们可以让我们的执行更加迟缓。您可能知道,Mono扩展了反应流规范中的Publisher。反应流的突出特点是背压支持。因此,使用MonoAPI,我们只能在真正需要数据时执行,并且我们的订户已经准备好使用它们:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

在此示例中,我们仅在订阅者调用Subscription#request时执行数据,因此它声明它已准备好接收数据。

  • CompletableFuture是异步的,可以是非阻塞的
  • CompletableFuture迫在眉睫。你不能推迟执行。但是你可以取消它们(总比没有好)
  • Mono是异步/非阻塞的,通过使用不同的操作符组合主Mono,可以轻松地在不同的线程上执行任何调用
  • Mono确实很懒,允许订阅者延迟执行启动,并随时准备使用数据

 类似资料:
  • 给定以下monos: 双: 和: 具有相同的输出: 和之间有什么区别,在这种情况下? 从https://projectreactor.io/docs/core/release/reference/index.html#which-operator: [如果你]有一个序列,但[你]对值不感兴趣,并且[你]想在最后切换到另一个单声道,[使用]。 [如果您]希望通过将发布者从1个Mono和任何源端协调到

  • 问题内容: 为了帮助理解monad是什么,有人可以使用Java提供示例吗?有可能吗? 如果您从此处http://jdk8.java.net/lambda/下载预发行版本的兼容Lambda的JDK8,则可以使用Java使用Lambda表达式。 下面显示了使用此JDK的lambda示例,有人可以提供相对简单的monad吗? 问题答案: 仅供参考: 提出的JDK8可选类 确实满足 了Monad的三个定律

  • 有些方法返回。这是一个典型的模式,分多个层: 假设返回一个(它向数据库写入一些内容,发送一条消息等)。如果我只是用替换它,那么一切都按预期工作,否则就不是这样了。更具体地说,单声道从未完成,它贯穿所有处理,但在结束时错过了终止信号。所以这些东西实际上是写在数据库中的,消息实际上是发送的,等等。 为了证明没有终止是问题所在,这里有一个有效的黑客: null 我不能在应用程序之外用简单的单工作流复制这

  • 我最近一直在学习使用Java中的reactor库和Spring框架进行反应式编程,并且在很大程度上我已经能够掌握它。然而,我发现自己有好几次遇到同样的情况,我想知道我哪里出了问题。 我正在努力解决的问题的要点是,我经常想用mono做一些事情,比如找到一些补充数据,然后将其添加回原始mono中。zip函数在我看来是一个理想的候选函数,但最终我订阅了两次原始mono,这不是我的意图。 这里有一个人为的

  • 我不知道或问这个问题,除了这里,如果不是我道歉的地方。 这里有一个REST请求,它附带了一个简单的bean,其中包含一个列表等属性。对此列表进行迭代,以使用返回Mono(findOne)的响应mongo调用。但我不认为我找到了正确的方法: 在我看来,“反应性”的想法并不是必须做一个块,但我没有找到如何做,否则。 有人能帮我找到做这项任务的最佳方法吗?

  • 问题内容: 我在这里遇到以下问题:我得到了一个代表音频数据的字节块(uint16_t *),并且生成它们的设备正在捕获单声道声音,因此很明显,我在1通道上具有单声道音频数据。我需要将此数据传递到另一台设备,该设备需要交错的立体声数据(因此为2个通道)。我要做的基本上是复制数据中的1通道,以便立体声数据的两个通道都包含相同的字节。您能指出一个有效的算法吗? 谢谢,f。 问题答案: 如果只需要交错的立