当前位置: 首页 > 知识库问答 >
问题:

Java CompletableFuture:仅有第一个结果

彭礼骞
2023-03-14

在阅读了Oracle站点上的这篇文章https://community.Oracle.com/docs/doc-995305之后,我将尝试实现“Some Two-to-One selection patterns”段落中描述的模式。这最后一类模式还包含二对一模式。但是这次不是执行一次下游元素,而是完成两个上游元素,当两个上游元素中的一个完成时执行下游元素。例如,当我们要解析域名时,这可能会证明非常有用。与其只查询一个域名服务器,我们可能会发现查询一组域名服务器更有效。我们不期望从不同的服务器得到不同的结果,因此我们不需要比我们得到的第一个更多的答案。可以安全地取消所有其他查询。

实现一个只有2个CompleatableFuture的场景很简单,但我无法实现有3个或更多个CompleatableFuture的场景。

我试过这个:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    cf1.applyToEither(
            cf2, s1 -> cf2.applyToEither(
                    cf3, s2 -> cf3.applyToEither(
                            cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();

FutureMain是我的类,这是generateString方法

public static String generateString(String input) {
    Random r = new Random();
    int millis = r.nextInt(6) * 1000;
    System.out.println(input + " " + millis);
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return input + ": " + millis;
}

我成功地组合了多个CompleatableFuture,当我希望它们全部完成时:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    CompletableFuture<String> cf5 = CompletableFuture.allOf(cf1, cf2, cf3, cf4).thenApply(
            s -> elaborate(cf1.join(), cf2.join(), cf3.join(), cf4.join())); 

    cf5.thenAccept(System.out::println).join();

有什么建议吗?

共有1个答案

朱和惬
2023-03-14

二对一选择模式表示:

当两个上游元素中的一个完成时,执行下游元素。

例如,从两台服务器中选择一个用户,一台服务器返回一个用户,另一台服务器将被阻塞或由于某种原因稍后返回一个用户,无论哪个服务器已返回一个用户,下游都将执行。

//the first upstream is always blocked.
CompletableFuture<String> blocked = new CompletableFuture<>();
CompletableFuture<String> upstreams = Stream.of(cf1, cf2, cf3, cf4).reduce(blocked,
        (it, upstream) -> it.applyToEither(upstream, Function.identity()));

upstreams.thenAccept(System.out::println).join();// print "foo"

我从completableFeature中导入了用于打印问题的静态方法supplyAsync

CompletableFuture<String> cf1 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf2 = supplyAsync(returnValueLater("bar"));
CompletableFuture<String> cf3 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf4 = supplyAsync(returnValue("foo"));

CompletableFuture<String> upstreams = cf1.applyToEither(cf2, Function.identity())
                                         .applyToEither(cf3, Function.identity())
                                         .applyToEither(cf4, Function.identity());

upstreams.thenAccept(System.out::println).join();// print "foo"

private <T> Supplier<T> returnValue(T value) {
    return returnValue(() -> value);
}

private <T> Supplier<T> blocked(Class<T> type) {
    return returnValue(() -> {
        Thread.currentThread().join();
        return null;
    });
}

private <T> Supplier<T> returnValueLater(T value) {
    return returnValue(() -> {
        Thread.sleep(100);
        return value;
    });
}

private <T> Supplier<T> returnValue(Callable<T> value) {
    return () -> {
        try {
            return value.call();
        } catch (Exception e) { throw new RuntimeException(e); }
    };
}
import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.concurrent.CompletableFuture.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class CompletableFuturePatternTest {

    @Test @DisplayName("Two-to-One Selecting Pattern")
    void selectingManyToOne() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .getFirstResult();

        assertThat(user, equalTo("Joe"));
    }

    @Test @DisplayName("Two-to-One Combining Pattern")
    void combiningManyToOne() throws Throwable {
        List<String> users = select("select user from User", String.class)
                .from(availableServers())
                .list();

        assertThat(users, equalTo(asList("Bob", "Joe", "Doe")));
    }

    @Test @DisplayName("One-to-One Pattern")
    void waitUntilUpstreamCompleted() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .to(String::toUpperCase);

        assertThat(user, equalTo("JOE"));
    }

    private CompletableFuture<String>[] availableServers() {
        return new CompletableFuture[]{
                server(returnValueLater("Bob")),
                server(returnValue("Joe")),
                server(returnValueLater("Doe")),
        };
    }

    private <T> CompletableFuture<T> server(Supplier<T> supplier) {
        return supplyAsync(supplier);
    }

    private <T> Supplier<T> returnValue(T value) {
        return returnValue(() -> value);
    }


    private <T> Supplier<T> returnValueLater(T value) {
        return returnValue(() -> {
            Thread.sleep(500);
            return value;
        });
    }

    private <T> Supplier<T> returnValue(Callable<T> value) {
        return () -> {
            try {
                return value.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private <T> Query<T> select(String query, Class<T> type) {
        return new Query<T>() {
            private CompletableFuture<T>[] upstreams;

            @Override
            public Query<T> from(CompletableFuture<T>... upstreams) {
                this.upstreams = upstreams;
                return this;
            }

            @Override
            public T getFirstResult() throws Exception {
                return selecting().get();
            }

            @Override
            public <R> R to(Function<T, R> mapper) throws Exception {
                return selecting().thenApply(mapper).get();
            }

            private CompletableFuture<T> selecting() {
                return upstreams(blocked(), this::selecting);
            }

            private CompletableFuture<T> selecting(CompletableFuture<T> primary,
                                                   CompletableFuture<T> upstream) {
                return primary.applyToEitherAsync(upstream, Function.identity());
            }

            private CompletableFuture<T> blocked() {
                return new CompletableFuture<>();
            }

            @Override
            public List<T> list() throws Exception {
                return upstreams(collector(), this::combine, this::combiner).get();
            }

            private CompletableFuture<List<T>> collector() {
                return completedFuture(new ArrayList<>());
            }

            private CompletableFuture<List<T>> combine(CompletableFuture<List<T>> primary,
                                                       CompletableFuture<T> upstream) {
                return primary.thenCombineAsync(upstream, this::concat);
            }

            private List<T> concat(List<T> result, T value) {
                result.add(value);
                return result;
            }

            private CompletableFuture<List<T>> combiner(CompletableFuture<List<T>> primary
                    , CompletableFuture<List<T>> secondary) {

                return primary.thenCombineAsync(secondary, this::concat);
            }

            private <T> List<T> concat(List<T> primary, List<T> secondary) {
                primary.addAll(secondary);
                return primary;
            }

            private CompletableFuture<T> upstreams(CompletableFuture<T> identity,
                                                   BinaryOperator<CompletableFuture<T>> accumulator) {
                return upstreams(identity, accumulator, accumulator);
            }

            private <U> CompletableFuture<U> upstreams(CompletableFuture<U> identity
                    , BiFunction<CompletableFuture<U>, CompletableFuture<T>, CompletableFuture<U>> accumulator
                    , BinaryOperator<CompletableFuture<U>> combiner) {
                return Stream.of(upstreams).reduce(identity, accumulator, combiner);
            }

        };
    }

    interface Query<T> {
        Query<T> from(CompletableFuture<T>... upstreams);

        T getFirstResult() throws Exception;

        <R> R to(Function<T, R> mapper) throws Exception;

        List<T> list() throws Exception;
    }
}
 类似资料:
  • 你好,我在Java中使用正则表达式时遇到了一个问题。 我试图解析这个: 使用此代码(模式匹配器) 我的问题是,我的regexp只返回模式的第一次出现,即使我有一段时间(matcher.find())。。

  • 我正在使用Android中的Speech认知器和识别器来实现语音识别。我的目标是在我的语音识别器在屏幕上显示结果后重新开始听语音。为此,我使用以下代码。 问题是,第一次运行正常并显示结果,但在第二次开始侦听(从onResults方法调用)后,由于某种原因,它听不到正在说的内容。然后它给出一个ERROR\u SPEECH\u TIMEOUT错误,这意味着没有语音输入。在Logcat上,我可以看到它进

  • 问题就在这里:我创建了世界上最简单的RecyclerView,但它只显示第一个项目。我不明白为什么。谢谢你的帮助。 item_layout.xml mainactivity.java

  • Q1。我的理解是。但是超时场景呢? Q2。在哪里检查完整未来的默认超时设置?如何更改它?未来超时后会发生什么?(完成还是异常?) Q3。只要未来“完成”(完成或超时或任何最后阶段),我就需要。是否有保证在未来“完成”后调用的方法?我应该把放在哪里? 从新到完整的未来。更喜欢用Java8回答。谢谢你的帮助。

  • 问题内容: 如何只显示第一个角元素? 我正在这样使用: 但是由于我没有重复,所以我不必使用吗?我如何才能只显示第一个,而不必进行ng-repeat? 问题答案: 不要使用ng-repeat指令,这应该可以工作:

  • 我试图创建一个observate,它从firebase查询返回一个列表。问题是,当我调用onNext发出项目,然后调用onComplete时,它会停止发出第一个项目之后的项目,而根本不调用onComplete不会发出任何东西。有没有正确的方法来实现我想要实现的目标?我对RxJava还是很陌生,请原谅我的无知。感谢您的帮助:)