当前位置: 首页 > 知识库问答 >
问题:

使用RxJava组合具有依赖关系的异步观察对象

岳安福
2023-03-14

我是反应式编程的新手,对组合有依赖关系的可观察对象感到困惑。场景如下:有两个可观察对象A,B。可观察A取决于B发出的值。(因此A需要观察B)。有没有办法创建一个由A和B组成并发出V的可观察C?我只是在RxJava留档中寻找指针。

共有3个答案

糜淇
2023-03-14

我认为这取决于你需要做的A和B之间的组合,也取决于A如何依赖于B。

C是否一对接一对地组成A和B对(A1与B1组合,A2与B2组合,等等)-那么,zip将是您想要的函数。但是,在这种情况下,我想知道当您首先将B转换为A时,您是否不能只做这项工作-毕竟我假设您将B逐元素转换为A(在这种情况下,map将是一种方式)。

相反,如果您想为B发出的每个值创建一个新A(但想将所有这些As组合成一个可观察的),那么平面地图就是您需要的。

如果您确实首先需要B来创建A,然后再次需要它来组合A和B,那么您可能需要cacheB来节省再次计算所有内容的麻烦。

这里可能还有其他感兴趣的功能(如reduce或combinelatetest)。也许你可以提供更多关于你想做什么的细节?

云利
2023-03-14

我也是反应式编程的新手,只是整理了一些可能对您的案例感兴趣的代码

A需要遵守B

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertTrue;

public class Q22284380TestCase {

    private static final Logger LOGGER = LoggerFactory.getLogger(
            Q22284380TestCase.class);

    private AtomicBoolean completed = new AtomicBoolean(false);

    @Test
    public void testName() throws Exception {

        final Observable.OnSubscribe<Integer> onSubProduceTwoValues = new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> subscriber) {

                final Thread thread = new Thread(new Runnable() {

                    public Integer i = 0;

                    @Override
                    public void run() {

                        final Integer max = 2;
                        while (i < max) {
                            subscriber.onNext(i);
                            i++;
                        }

                        subscriber.onCompleted();
                    }
                });

                thread.start();
            }
        };

        final Observable<Integer> values = Observable.create(onSubProduceTwoValues);

        final Observable<Integer> byTwoMultiplier = values
                .flatMap(new Func1<Integer, Observable<Integer>>() {

                    @Override
                    public Observable<Integer> call(Integer aValue) {

                        return doubleIt(aValue);

                    }
                });

        byTwoMultiplier.subscribe(new Subscriber<Integer>() {

            @Override
            public void onNext(Integer a) {

                LOGGER.info("" + a);

            }

            @Override
            public void onCompleted() {

                completed.set(true);

            }

            @Override
            public void onError(Throwable e) {

                LOGGER.error(e.getMessage());
            }
        });

        Thread.sleep(1000L);
        assertTrue(completed.get());

    }

    private Observable<Integer> doubleIt(final Integer value) {

        return Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> subscriber) {

                final Thread thread = new Thread(new Runnable() {

                    @Override
                    public void run() {

                        try {
                            subscriber.onNext(value * 2);
                            subscriber.onCompleted();
                        } catch (Throwable e) {
                            subscriber.onError(e);
                        }

                    }
                });

                thread.start();

            }
        });
    }
}

有了值的生产者,它只需使用flatMap将doubleIt函数应用于输出。要做一些不同的事情,如果你想有一个V是a和B的组合,你可以阅读zip。

海鸣
2023-03-14

你的问题对A如何依赖于B有点模糊,所以我将尝试举几个例子来说明如何组合可观察性。

示例-没有B就无法创建A-使用map()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final B b;
    public A(B b) {
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .map(new Func1<B, A>() {
                @Override
                public A call(B b) {
                    return new A(b);
                }
            });
}

示例-每次出现B都可以创建零个或多个A-使用平面图()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .flatMap(new Func1<B, Observable<? extends A>>() {
                @Override
                public Observable<? extends A> call(final B b) {
                    return Observable.create(new Observable.OnSubscribe<A>() {
                        @Override
                        public void call(Subscriber<? super A> subscriber) {
                            for (int i = 0; i < b.value; i++) {
                                subscriber.onNext(new A(i));
                            }
                            subscriber.onCompleted();
                        }
                    });
                }
            });
}

我不太确定你对可观测值C和V的要求,所以让我们看看更多组合可观测值的方法。

示例-组合两个可观察对象发出的每对项-使用zip()

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class C {
    private final A a;
    private final B b;
    public C(A a, B b) {
        this.a = a;
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return Observable.from(new A(0), new A(1), new A(2), new A(3));
}

public Observable<C> createObservableC() {
    return Observable.zip(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}

示例-合并两个可观察对象的最后一项-使用组合最新()

// Use the same class definitions from previous example.
public Observable<C> createObservableC1() {
    return Observable.combineLatest(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}
 类似资料:
  • 依赖关系观察器机制允许您将侦听器注册到依赖关系中。功能实际上是Observator模式的实现。当依赖关系改变其状态(UP或DOWN)时,可以应用一些自定义逻辑。 如何激活 Spring Cloud Zookeeper依赖关系功能需要启用从依赖关系观察器机制中获利。 注册听众 为了注册一个监听器,你必须实现一个接口org.springframework.cloud.zookeeper.discove

  • 我需要将另一个改装请求中的图像合并到其特定系列中。听起来很简单的任务把我带进了反应式的地狱,没有一丝希望。 具体来说,我还通过可观察创建服务,因为我需要获取身份验证令牌来创建服务。 我的尝试基于以下答案:https://stackoverflow.com/a/28418503/2192545. 我有点不知所措。我只是在Observable的Func2部分得到了“无法推断函数接口类型”。在IDE中,

  • 我正在学习RxJava/Android(我目前正在将其与网络呼叫改造相结合),现在我有一个问题,假设我有6个不同的可观察对象,如下所示:<代码>可观察 apiInterface等为改造客户端,GetClient等为调用 现在,我如何异步执行这6个不同的调用,以及当所有6个调用都完成时-

  • 我正在尝试实现某种流式解析器。假设我有整数的stream,我将它们组合起来创建新的object,它聚合了stream的一部分。 例如,当integer为负数时,object为“done”。为了保持简单,生成的项目将是一串数字。 下面是一个简单的例子: 看起来每个输入onNext都应该调用output onNext。 有什么想法吗?

  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 我在创建一个将返回对象列表的可观察对象时遇到了麻烦。我有一个ID列表,想对我的数据库提出一个请求。在这种情况下,我使用的是Firebase。当得到一个结果时,我希望将这些对象中的每一个编译成一个列表,然后返回该列表。我需要在返回之前等待所有的对象都返回。我在我的视图模型反序列化器类中这样做。这是我的代码。 有几种方法可以从firebase数据库中返回数据,我可以返回Documentsnapshot