我是反应式编程的新手,对组合有依赖关系的可观察对象感到困惑。场景如下:有两个可观察对象A,B。可观察A取决于B发出的值。(因此A需要观察B)。有没有办法创建一个由A和B组成并发出V的可观察C?我只是在RxJava留档中寻找指针。
我认为这取决于你需要做的A和B之间的组合,也取决于A如何依赖于B。
C是否一对接一对地组成A和B对(A1与B1组合,A2与B2组合,等等)-那么,zip将是您想要的函数。但是,在这种情况下,我想知道当您首先将B转换为A时,您是否不能只做这项工作-毕竟我假设您将B逐元素转换为A(在这种情况下,map将是一种方式)。
相反,如果您想为B发出的每个值创建一个新A(但想将所有这些As组合成一个可观察的),那么平面地图
就是您需要的。
如果您确实首先需要B来创建A,然后再次需要它来组合A和B,那么您可能需要cache
B来节省再次计算所有内容的麻烦。
这里可能还有其他感兴趣的功能(如reduce或combinelatetest)。也许你可以提供更多关于你想做什么的细节?
我也是反应式编程的新手,只是整理了一些可能对您的案例感兴趣的代码
A需要遵守B
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.testng.Assert.assertTrue;
public class Q22284380TestCase {
private static final Logger LOGGER = LoggerFactory.getLogger(
Q22284380TestCase.class);
private AtomicBoolean completed = new AtomicBoolean(false);
@Test
public void testName() throws Exception {
final Observable.OnSubscribe<Integer> onSubProduceTwoValues = new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
final Thread thread = new Thread(new Runnable() {
public Integer i = 0;
@Override
public void run() {
final Integer max = 2;
while (i < max) {
subscriber.onNext(i);
i++;
}
subscriber.onCompleted();
}
});
thread.start();
}
};
final Observable<Integer> values = Observable.create(onSubProduceTwoValues);
final Observable<Integer> byTwoMultiplier = values
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer aValue) {
return doubleIt(aValue);
}
});
byTwoMultiplier.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer a) {
LOGGER.info("" + a);
}
@Override
public void onCompleted() {
completed.set(true);
}
@Override
public void onError(Throwable e) {
LOGGER.error(e.getMessage());
}
});
Thread.sleep(1000L);
assertTrue(completed.get());
}
private Observable<Integer> doubleIt(final Integer value) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
subscriber.onNext(value * 2);
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
thread.start();
}
});
}
}
有了值的生产者,它只需使用flatMap将doubleIt函数应用于输出。要做一些不同的事情,如果你想有一个V是a和B的组合,你可以阅读zip。
你的问题对A如何依赖于B有点模糊,所以我将尝试举几个例子来说明如何组合可观察性。
示例-没有B就无法创建A-使用map()
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class A {
public final B b;
public A(B b) {
this.b = b;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return createObservableB()
.map(new Func1<B, A>() {
@Override
public A call(B b) {
return new A(b);
}
});
}
示例-每次出现B都可以创建零个或多个A-使用平面图()
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class A {
public final int value;
public A(int value) {
this.value = value;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return createObservableB()
.flatMap(new Func1<B, Observable<? extends A>>() {
@Override
public Observable<? extends A> call(final B b) {
return Observable.create(new Observable.OnSubscribe<A>() {
@Override
public void call(Subscriber<? super A> subscriber) {
for (int i = 0; i < b.value; i++) {
subscriber.onNext(new A(i));
}
subscriber.onCompleted();
}
});
}
});
}
我不太确定你对可观测值C和V的要求,所以让我们看看更多组合可观测值的方法。
示例-组合两个可观察对象发出的每对项-使用zip()
public class A {
public final int value;
public A(int value) {
this.value = value;
}
}
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class C {
private final A a;
private final B b;
public C(A a, B b) {
this.a = a;
this.b = b;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return Observable.from(new A(0), new A(1), new A(2), new A(3));
}
public Observable<C> createObservableC() {
return Observable.zip(createObservableA(), createObservableB(),
new Func2<A, B, C>() {
@Override
public C call(A a, B b) {
return new C(a, b);
}
}
);
}
示例-合并两个可观察对象的最后一项-使用组合最新()
// Use the same class definitions from previous example.
public Observable<C> createObservableC1() {
return Observable.combineLatest(createObservableA(), createObservableB(),
new Func2<A, B, C>() {
@Override
public C call(A a, B b) {
return new C(a, b);
}
}
);
}
依赖关系观察器机制允许您将侦听器注册到依赖关系中。功能实际上是Observator模式的实现。当依赖关系改变其状态(UP或DOWN)时,可以应用一些自定义逻辑。 如何激活 Spring Cloud Zookeeper依赖关系功能需要启用从依赖关系观察器机制中获利。 注册听众 为了注册一个监听器,你必须实现一个接口org.springframework.cloud.zookeeper.discove
我需要将另一个改装请求中的图像合并到其特定系列中。听起来很简单的任务把我带进了反应式的地狱,没有一丝希望。 具体来说,我还通过可观察创建服务,因为我需要获取身份验证令牌来创建服务。 我的尝试基于以下答案:https://stackoverflow.com/a/28418503/2192545. 我有点不知所措。我只是在Observable的Func2部分得到了“无法推断函数接口类型”。在IDE中,
我正在学习RxJava/Android(我目前正在将其与网络呼叫改造相结合),现在我有一个问题,假设我有6个不同的可观察对象,如下所示:<代码>可观察 apiInterface等为改造客户端,GetClient等为调用 现在,我如何异步执行这6个不同的调用,以及当所有6个调用都完成时-
我正在尝试实现某种流式解析器。假设我有整数的stream,我将它们组合起来创建新的object,它聚合了stream的一部分。 例如,当integer为负数时,object为“done”。为了保持简单,生成的项目将是一串数字。 下面是一个简单的例子: 看起来每个输入onNext都应该调用output onNext。 有什么想法吗?
我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从
我在创建一个将返回对象列表的可观察对象时遇到了麻烦。我有一个ID列表,想对我的数据库提出一个请求。在这种情况下,我使用的是Firebase。当得到一个结果时,我希望将这些对象中的每一个编译成一个列表,然后返回该列表。我需要在返回之前等待所有的对象都返回。我在我的视图模型反序列化器类中这样做。这是我的代码。 有几种方法可以从firebase数据库中返回数据,我可以返回Documentsnapshot