当前位置: 首页 > 知识库问答 >
问题:

如何强制CompletableFuture.thenApply()在运行前一阶段的相同线程上运行?

壤驷睿
2023-03-14

以下是我面临的问题的简短代码版本:

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                /*
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ignored) {}
                */
                //System.out.println("supplyAsync: " + Thread.currentThread().getName());
                return 1;
            })
            .thenApply(i -> {
                System.out.println("apply: " + Thread.currentThread().getName());
                return i + 1;
            })
            .thenAccept((i) -> {
                System.out.println("accept: " + Thread.currentThread().getName());
                System.out.println("result: " + i);
            }).join();
}

这是我得到的输出:

apply: main
accept: main
result: 2

我很惊讶地看到main在那里!当我取消注释Thread.sleep()调用,甚至取消注释单个sysout语句时,我预计会发生这样的事情:

supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2

我理解thenApplyAsync()将确保它不会在线程上运行,但是我想避免将供应商返回的数据从运行的线程传递到将要运行的线程然后在链中应用其他后续的代码。

共有1个答案

李兴安
2023-03-14

方法thenApply计算调用方线程中的函数,因为未来已经完成。当然,当你在供应商中插入一个睡眠时,未来还没有完成,thenApply被调用。即使是print语句也可能会减慢供应商的速度,使主线程首先调用thenApplythenAccess。但这不是可靠的行为,在重复运行代码时可能会得到不同的结果。

未来不仅不记得是哪个线程完成了它,而且也无法告诉任意线程执行特定代码。线程可能忙于其他事情,完全不合作,甚至在此期间终止。

只考虑

java prettyprint-override">ExecutorService s = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync: " + Thread.currentThread().getName());
    return 1;
}, s);
s.shutdown();
s.awaitTermination(1, TimeUnit.DAYS);
cf.thenApply(i -> {
    System.out.println("apply: " + Thread.currentThread().getName());
    return i + 1;
})
.thenAccept((i) -> {
    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

我们怎么能期望传递给然后apply然后accept的函数在已经终止的池的工作线程中执行呢?

我们也可以写

CompletableFuture<Integer> cf = new CompletableFuture<>();

Thread t = new Thread(() -> {
    System.out.println("completing: " + Thread.currentThread().getName());
    cf.complete(1);
});
t.start();
t.join();

System.out.println("completer: " + t.getName() + " " + t.getState());
cf.thenApply(i -> {
    System.out.println("apply: " + Thread.currentThread().getName());
    return i + 1;
})
.thenAccept((i) -> {
    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

它将打印相似的内容

completing: Thread-0
completer: Thread-0 TERMINATED
apply: main
accept: main
result: 2

显然,我们不能坚持让这个线程处理后续阶段。

但是,即使该线程是池中仍处于活动状态的工作线程,它也不知道它已经完成了未来,也不知道“处理后续阶段”的概念。在Executor抽象之后,它刚刚从队列中接收到一个任意的Runnable,在处理它之后,它继续其主循环,从队列中获取下一个Runnable

因此,一旦第一个未来已经完成,告诉它完成其他未来工作的唯一方法就是让任务排队。当使用然后使用applyasync指定相同的池或在没有执行器的情况下使用…Async方法执行所有操作(即使用默认池)时,会发生这种情况。

当您对所有使用单线程执行器时...异步方法,您可以确保所有操作都由同一个线程执行,但它们仍然会通过池的队列。即使如此,在已经完成的未来情况下,主线程实际上也在排队执行相关操作,线程安全队列以及同步开销是不可避免的。

但是请注意,即使在单个工作线程顺序处理所有依赖操作之前,您首先创建了依赖操作链,这种开销仍然存在。每个未来的完成都是通过以线程安全的方式存储新状态来完成的,使结果对所有其他线程都可能可见,并自动检查是否同时发生了并发完成(例如取消)。然后,由其他线程链接的依赖操作将在执行之前以线程安全的方式获取。

所有这些具有同步语义的操作都使得在拥有一系列依赖的CompletableFutures时,由同一线程处理数据不太可能有好处。

要使实际的本地处理具有潜在的性能优势,唯一的方法是使用

CompletableFuture.runAsync(() -> {
    System.out.println("supplyAsync: " + Thread.currentThread().getName());
    int i = 1;

    System.out.println("apply: " + Thread.currentThread().getName());
    i = i + 1;

    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

或者,换句话说,如果您不希望分离处理,那么首先不要创建分离处理阶段。

 类似资料:
  • 我有一个多分支管道架构的以下Jenkinsfile 我试图在Ubuntu和Red Hat节点上并行运行“构建”阶段,而仅在Ubuntu节点上运行“测试”阶段。 任何人都可以帮助我指定如何选择在哪些节点上运行哪些阶段。我在网上找到的解决方案很少,但他们建议重写构建阶段两次:一次用于Red Hat节点,另一次用于Ubuntu节点。难道没有办法在没有代码重复的情况下做到这一点吗? 非常感谢

  • 问题内容: 假设我在同一台计算机上同时运行两个Java程序。这些程序将在单个JVM实例中运行还是在两个不同的JVM实例中运行? 问题答案: 如果您使用命令(从命令行)开始每个命令,它们将作为完全独立的JVM运行。 “程序”可以作为在一个JVM中运行的单独线程启动。

  • 我们都知道Java会彻底优化我们的代码,我们都喜欢它。嗯,大多数时候。下面是一段让我头疼的代码: 在快速、慢速的单线程和多线程处理器之间,结果可能会有所不同。在我测试的计算机上(没有doSomething),输出如下: CompareThread的前几次迭代运行良好,然后Java进行了“优化”:testValue和currentValue总是相等的,并且不断地改变它们的值,尽管线程从未离开最内层的

  • 问题内容: 我希望下面的gulp调用一个接一个地同步运行。但是他们不服从命令。 在运行序列节点模块没有帮助在这里,因为我并不想串联运行一饮而尽任务(即它的语法类似于等等),而是吞掉串联“来电”,你见下文。 如何使它们一个接一个地运行? 问题答案: 您可以将 异步 用作呼叫的控制流,以仅使它们完成一个任务,也避免了获得“金字塔效应”。因此,这样的事情对于您的用例应该是好的: 这还将使您能够进行一些错

  • 我正在运行RxJava并创建一个主题以使用方法生成数据。我正在使用Spring。 这是我的设置: 在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated) 无论我为subscribeOn()指定了什么 Schedulers.io()

  • 问题内容: 我有一个正在运行的线程,但是从外面我无法绕过一个值来停止该线程。如何在内部发送false / true值或调用运行线程的公共方法?当我按下按钮1?例如: 或 跟进(校对): 问题答案: 如果您通过类而不是通过a定义它,则可以调用实例方法。 还要注意,由于多个内核具有自己的关联内存,因此您需要警告处理器该状态可能在另一个处理器上更改,并且它需要监视该更改。听起来很复杂,但只需将’vola