当前位置: 首页 > 知识库问答 >
问题:

如何组合可观察对象以避免给定的嵌套和依赖回调?

楚帅
2023-03-14

在这个博客中,他给出了这个(复制/粘贴以下代码)回调地狱的例子。但是,没有提到如何使用反应式扩展来消除问题。

所以这里F3取决于F1的完成,而F4和F5取决于F2的完成。

  1. 想知道Rx中的功能等效物是什么
  2. 如何在Rx中表示F1、F2、F3、F4和F5都应异步拉取

注意:我目前正在尝试将我的头围绕Rx,所以在问这个问题之前我没有尝试解决这个例子。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */
    public static void run() throws Exception {
        final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        /* the following are used to synchronize and compose the asynchronous callbacks */
        final CountDownLatch latch = new CountDownLatch(3);
        final AtomicReference<String> f3Value = new AtomicReference<String>();
        final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
        final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();

        try {
            // get f3 with dependent result from f1
            executor.execute(new CallToRemoteServiceA(new Callback<String>() {

                @Override
                public void call(String f1) {
                    executor.execute(new CallToRemoteServiceC(new Callback<String>() {

                        @Override
                        public void call(String f3) {
                            // we have f1 and f3 now need to compose with others
                            System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f3Value.set(f3);
                            latch.countDown();
                        }

                    }, f1));
                }

            }));

            // get f4/f5 after dependency f2 completes 
            executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {

                @Override
                public void call(Integer f2) {
                    executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {

                        @Override
                        public void call(Integer f4) {
                            // we have f2 and f4 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f4Value.set(f4);
                            latch.countDown();
                        }

                    }, f2));
                    executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {

                        @Override
                        public void call(Integer f5) {
                            // we have f2 and f5 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                            // set to thread-safe variable accessible by external scope 
                            f5Value.set(f5);
                            latch.countDown();
                        }

                    }, f2));
                }

            }));

            /* we must wait for all callbacks to complete */
            latch.await();
            System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
        } finally {
            executor.shutdownNow();
        }
    }

    public static void main(String[] args) {
        try {
            run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final class CallToRemoteServiceA implements Runnable {

        private final Callback<String> callback;

        private CallToRemoteServiceA(Callback<String> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseA");
        }
    }

    private static final class CallToRemoteServiceB implements Runnable {

        private final Callback<Integer> callback;

        private CallToRemoteServiceB(Callback<Integer> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(100);
        }
    }

    private static final class CallToRemoteServiceC implements Runnable {

        private final Callback<String> callback;
        private final String dependencyFromA;

        private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
            this.callback = callback;
            this.dependencyFromA = dependencyFromA;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseB_" + dependencyFromA);
        }
    }

    private static final class CallToRemoteServiceD implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(140);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(40 + dependencyFromB);
        }
    }

    private static final class CallToRemoteServiceE implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(55);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(5000 + dependencyFromB);
        }
    }

    private static interface Callback<T> {
        public void call(T value);
    }
}

共有2个答案

苏弘盛
2023-03-14

根据您的代码。假设远程调用是使用可观察的完成的。

 Observable<Integer>  callRemoveServiceA()  { /* async call */  }

/* .... */

Observable<Integer>  callRemoveServiceE(Integer f2) { /* async call */  }

您想要什么:

  • 调用服务A,然后调用服务B,得到服务A的结果

使用RxJava,您将使用以下代码实现这一点:

Observable<Integer> f3 = callRemoveServiceA() // call serviceA
            // call serviceB with the result of serviceA
            .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
                    // call serviceD and serviceE then build a new value
                    .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));

// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
            // display the value
            .subscribe(System.out::println);

这里的重要部分是flapMap和zip(或zipWith)的使用

  • flapMap将一个值转换为另一个可观察的值。这是您的新asynchronos调用。( http://reactivex.io/documentation/operators/flatmap.html)

您可以在此处获得有关flapMap的更多信息:您什么时候在RxJava中使用map vs平面图?

长孙玉泽
2023-03-14

我是引用的关于回调和Java未来的博客文章的原始作者。下面是一个使用flatMap、zip和merge异步进行服务组合的示例。

它获取一个用户对象,然后同时获取社交和个性化目录数据,然后针对个性化目录中的每个视频,同时获取书签、分级和元数据,将它们拉到一起,并将所有响应合并为一个渐进流输出,作为服务器发送的事件。

return getUser(userId).flatMap(user -> {
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
            .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                    video -> {
                        Observable<Bookmark> bookmark = getBookmark(video);
                        Observable<Rating> rating = getRatings(video);
                        Observable<VideoMetadata> metadata = getVideoMetadata(video);
                        return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                    }));

    Observable<Map<String, Object>> social = getSocial(user).map(s -> {
        return s.getDataAsMap();
    });

    return Observable.merge(catalog, social);
}).flatMap(data -> {
    String json = SimpleJson.mapToJson(data);
    return response.writeStringAndFlush("data: " + json + "\n");
});

此示例可以在以下位置的功能应用程序上下文中看到:https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

因为我不可能在这里提供所有的信息,你也可以在https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.找到演示形式的解释(带有视频链接)

 类似资料:
  • 我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出

  • 问题内容: 在此博客中,他为回调地狱提供了此示例(复制/粘贴以下代码)。但是,没有提到如何通过使用Reactive Extensions来解决此问题。 因此,此处F3取决于F1完成,而F4和F5取决于F2完成。 想知道Rx中的功能等同于什么。 如何在Rx中表示F1,F2,F3,F4和F5应该全部异步拉? 注意: 我目前正在尝试绕过Rx,所以在提出这个问题之前,我没有尝试解决此示例。 问题答案: 我

  • 我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?

  • 我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。 我在员工服务中有一个方法,它返回一个可观察的 这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历

  • 我有一个返回用户对象的角度服务。用户对象有自己的属性和一组墙。服务向调用组件返回一个可观察的。在该服务中,我能够从http服务返回的json创建用户对象。但是,当我订阅组件中的服务时,返回的对象为null。我做错了什么?

  • 问题内容: 如果我有一个对象,希望能够观察其他几个可观察对象,而不是所有相同类型的对象。例如,我希望A能够观察B和C。B和C完全无关,除了它们都实现Observable之外。 显而易见的解决方案是仅在update方法中使用“ if instanceof”,但很快就会变得混乱,因此我想知道是否还有其他方法? 问题答案: 与以前的建议类似,您可以将更新更改为。 这样您可以添加一种方法 对于您要观察的每