刚刚开始探索reactor项目及其抽象、Mono和Flux,并希望了解与Java8 barebones CompletableFuture的基本区别。
下面是我的一个简单代码:
public static void main(String[] args) throws Exception {
Mono.fromCallable(() -> getData())
.map(s -> s + " World ")
.subscribe(s -> System.out.println(s));
CompletableFuture.supplyAsync(() -> getData())
.thenAccept(System.out::println);
System.out.println(Thread.currentThread()+" End ");
}
private static String getData() {
int j=0;
for(int i=0; i<Integer.MAX_VALUE; i++){
j = j - i%2;
}
System.out.println(Thread.currentThread()+" - "+j);
return " Hello ";
}
首先,CompletableFuture
并不奇怪。SupplyAsync
通过ForkJoinPool调度函数的执行,“end”行立即打印,程序终止,因为主线程在这里确实很短--正如预期的那样。
但是mono.fromcallable(...)
将主线程阻塞在那里。此外,在getData()
函数中打印的线程名是主线程。因此,我看到的是顺序/阻塞行为,而不是顺序/非阻塞(异步)行为。是因为我在同一个线程上应用了订阅函数,所以它阻塞了吗?有人能解释一下吗?
是因为我在同一个线程上应用了订阅函数,所以它阻塞了吗?
这正是似乎发生的事情。
这种特定的行为让我有点惊讶,因为这不是大多数管道的行为方式。大多数管道都有这样或那样的方式,其中的一些操作使管道异步。publishon
、subscribeon
是明显的例子,但flatmap
也可能具有这样的效果,可能还有许多其他效果。在这些情况下,subscribe将立即返回。
不过,这提示了反应式编程的一个非常重要的问题:管道不应该包含长的阻塞调用。准备一个反应管道,并在订阅时不阻塞地处理事件。因此,阻塞语句有可能阻塞整个执行。通过使用调度器
,您可以将此类调用限制在特殊的线程池中,从而控制它们的效果。
我创建了一个简单的Kafka使用者,它返回一个对象流(接收到的消息),我试图使用测试它。 在我的测试中,我做了类似的事情: 断言工作正常(如果将值从更改为其他值,则测试失败)。但是,如果断言通过,测试永远不会退出。 我还尝试使用方法,如下所示: 在本例中,我得到以下错误: 你知道我做错了什么吗?
我用的是Spring WebFlux,Reactor核心。我有一个疑问。Spring webflux是否遵循每个请求的线程模型?我想问的是,在spring webflux中,一个请求可以在多个TOMCAT线程中执行吗?
来自javadocs, 如果任何给定的CompletableFutures异常完成,那么返回的CompletableFutures也会这样做,CompletionException将此异常作为其原因。 如果异常完成,则返回的CompletableFuture也会这样做,CompletionException将此异常作为其原因。 这是否意味着allOf()和anyOf()在任何Completable
嗨,我正在尝试创建一个模式内的地图。但是地图只显示了一部分。我尝试过在节点创建后使其无效,但它似乎不起作用。谢谢!
如何使用5个CompletableFutures异步执行20个可运行任务(或1个任务20次)? 这就是我得到的: 如果我执行这段代码,我可以看到它只运行3次。异步获取():3,然后在1 for()迭代中剩下2 所以,我想做所有20个任务,尽可能异步
问题内容: 我尽力而为,但没有找到任何文章和博客可以清楚地比较和,并且提供了很好的分析。 因此,如果任何人都可以向我解释或指向这样的博客或文章,那对我来说真的非常好。 问题答案: 无论 ListenableFuture 和 CompletableFuture 有超过它的父类的优势 未来 通过允许呼叫者在这样或那样的回调“注册”当异步动作已经完成被调用。 使用 Future, 您可以执行以下操作: