当前位置: 首页 > 知识库问答 >
问题:

用于Java CompletableFuture合成的线程?

祁博涛
2023-03-14

我已经开始熟悉Java的CompletableFuture组合了,之前我使用过JavaScript。基本上,合成只是在指定的执行器上调度链式命令。但我不确定在执行合成时运行的是哪个线程。

假设我有两个执行者,executor1和executor2;为了简单起见,假设它们是独立的线程池。我计划了一个完整的未来(使用非常松散的描述):

CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);

完成后,我使用第二个执行器将Foo转换为Bar

CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);

我知道将从executor1线程池中的线程调用getFoo()。我知道将从executor2线程池中的线程调用fooToBar()

但是实际合成使用的线程是什么,即在getFoo()完成并且futureFoo()完成之后;但是在执行器2上调度fooToBar()命令之前?换句话说,哪个线程实际运行代码来调度第二个执行器上的第二个命令?

调度是否作为调用getFoo()的executor1中相同线程的一部分执行?如果是这样,这个可完成的未来组合是否等同于我在执行任务1的第一个命令中手动调度自己?


共有2个答案

金旺
2023-03-14

最后是一个简单的程序,它喜欢您的代码片段并允许您使用它。

输出确认当它等待的条件准备就绪时,您提供的执行程序被调用完成(除非您足够早地显式调用完成-这将发生在完成的调用线程中)-Future上的get()块,直到Future完成。

提供一个参数-有一个执行器1和执行器2,不提供参数,只有一个执行器。输出是(同一个执行器-在同一个执行器中依次作为单独的任务运行)-

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-1-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

或者(两个执行器-事情再次按顺序运行,但使用不同的执行器)-

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-2-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

请记住:带有执行器的代码(在本例中,可以立即在另一个线程中启动。.getFoo是在设置FooToBar之前调用的)。

代码如下-

package your.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

public class TestCompletableFuture {
    private static void dumpWhichThread(final String msg) {
        System.err.println("In thread " + Thread.currentThread().toString() + " - " + msg);
    }

    private static final class Foo {
        final int i;
        Foo(int i) {
            this.i = i;
        }
    };
    public static Supplier<Foo> getFoo() {
        dumpWhichThread("getFoo");
        return new Supplier<Foo>() {
            @Override
            public Foo get() {
                dumpWhichThread("Supplying Foo");
                return new Foo(10);
            }

        };
    }

    private static final class Bar {
        final String j;
        public Bar(final String j) {
            this.j = j;
        }
    };
    public static Function<Foo, Bar> getFooToBar() {
        dumpWhichThread("getFooToBar");
        return new Function<Foo, Bar>() {
            @Override
            public Bar apply(Foo t) {
                dumpWhichThread("fooToBar");
                return new Bar("" + t.i);
            }
        };
    }


    public static void main(final String args[]) throws InterruptedException, ExecutionException, TimeoutException {
        final TestCompletableFuture obj = new TestCompletableFuture();
        obj.running(args.length == 0);
    }

    private String running(final boolean sameExecutor) throws InterruptedException, ExecutionException, TimeoutException {
        final Executor executor1 = Executors.newSingleThreadExecutor(); 
        final Executor executor2 = sameExecutor ? executor1 : Executors.newSingleThreadExecutor(); 
        CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(getFoo(), executor1);
        CompletableFuture<Bar> futureBar = futureFoo.thenApplyAsync(getFooToBar(), executor2);
        try {
            // Try putting a complete here before the get ..
            return futureBar.get(50, TimeUnit.SECONDS).j;
        }
        finally {
            dumpWhichThread("Completed");
        }
    }
}

哪一个线程触发了Bar阶段的进程——在上面——它是executor1。通常,完成未来的线程(即给它一个值)是释放依赖于它的事物的线程。如果你在主线程上立即完成了FutureFoo,那么它就是触发它的线程。

所以你必须小心。如果你有“N”件事情都在等待未来的结果——但只使用一个线程执行器——那么第一个计划的执行器会阻止该执行器,直到它完成。你可以推断为M个线程,N个未来——它可能会衰减为“M”锁,阻止其余事情的进展。

朱炳
2023-03-14

这是故意未指定的。实际上,当调用不带Async后缀的变体并表现出类似行为时,它将由处理链式操作的相同代码处理。

所以当我们使用以下测试代码时

CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return "";
}, r -> new Thread(r, "A").start())
.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

可能会打印出来

scheduled by Thread[A,5,main]

因为完成前一阶段的线程用于调度依赖操作。

然而,当我们使用

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "",
    r -> new Thread(r, "A").start());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
first.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

可能会打印出来

scheduled by Thread[main,5,main]

当主线程调用AcceptAsync时,第一个未来已经完成,主线程将调度操作本身。

但这并不是故事的结局。当我们使用

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
    return "";
}, r -> new Thread(r, "A").start());

Set<String> s = ConcurrentHashMap.newKeySet();
Runnable submitter = () -> {
    String n = Thread.currentThread().getName();
    do {
        for(int i = 0; i < 1000; i++)
            first.thenAcceptAsync(x -> s.add(n+" "+Thread.currentThread().getName()),
                Runnable::run);
    } while(!first.isDone());
};
Thread b = new Thread(submitter, "B");
Thread c = new Thread(submitter, "C");
b.start();
c.start();
b.join();
c.join();
System.out.println(s);

它不仅可以打印第一个场景中的组合A和C,还可以打印第二个场景中的组合B和C。在我的机器上,它还可以重复打印组合C和B,这表明一个线程传递给AcceptAsync的操作被另一个线程提交给执行器,另一个线程调用不同的操作。

这与此答案中描述的评估传递给thenApply的函数的线程的场景相匹配(没有Async)。正如开头所说,这是我所期望的,因为这两件事很可能由相同的代码处理。但与评估传递给thenApply的函数的线程不同,在Execitor上调用执行方法的线程甚至没有在留档中提及。因此理论上,另一个实现可以使用完全不同的线程,既不在未来调用方法也不完成它。

 类似资料:
  • Q1。我的理解是。但是超时场景呢? Q2。在哪里检查完整未来的默认超时设置?如何更改它?未来超时后会发生什么?(完成还是异常?) Q3。只要未来“完成”(完成或超时或任何最后阶段),我就需要。是否有保证在未来“完成”后调用的方法?我应该把放在哪里? 从新到完整的未来。更喜欢用Java8回答。谢谢你的帮助。

  • 使用此文档了解 After Effects 中用于 VR/360 视频的合成工具 概述 在 After Effects 中使用 VR 合成编辑器像处理常规素材一样处理您的 360 度素材。该编辑器为您提供基于视图的编辑,让您像通过 VR 头盔一般查看素材。VR 合成编辑器中的合成工具使用 2D 和 3D 编辑来构建球面合成空间。使用此编辑器可在球面合成空间的编辑模式之间切换,并查看最终输出。 使用

  • 在线语音合成(tts) 概述 该API将文本转换为语音文件,支持不同编码格式和采样率. 调用示例 curl -sSL -v -X POST "https://ai.nationalchip.com/api/v1/tts" -H "accept: */*" -H "Authorization: Bearer ${access_token}" -H "Content-Type: applicatio

  • 本文向大家介绍详解WordPress中用于合成数组的wp_parse_args()函数,包括了详解WordPress中用于合成数组的wp_parse_args()函数的使用技巧和注意事项,需要的朋友参考一下 wp_parse_args() 函数是 WordPress 核心经常用到的函数,它的用途很多,但最主要用来给一个数组参数(args)绑定默认值。 因为 wp_parse_args() 函数返回

  • 我有一个JMeter测试计划,其中包含具有不同工作负载和吞吐量的多个线程组。我想使用吞吐量成形计时器,但只对一个线程组应用成形。如果我在线程组中有计时器,它似乎仍然作用于整个测试计划。 例如,如果我将其设置为每秒6个请求,并运行测试10分钟,则在“查看结果”树中会得到3600个条目(这是预期的)。不幸的是,这3600个条目包括来自其他线程组的请求。我希望只从这个线程组中获得3600个条目,然后从其

  • 我运行一个大型的minecraft服务器,minecraft服务器端是单线程的。一切都是在主游戏循环中完成的。如果Mojang使minecraft服务器端多线程化,minecraft服务器每年将节省200万美元,因为租用的硬件更少。 不管怎样,我听说过这些谣言和理论。我从来都无法用谷歌搜索并弄清楚。 有没有必要使用多核cpu,并将其转换为单核、单线程? 我一直在猜测虚拟机管理程序软件将运行多线程,