我对RxJava很陌生,每当我有一个情况,我需要从链上的一个可观察的返回数据传递到调用“订阅” - 我很难理解如何在没有任何补丁的情况下以“反应式”方式做到这一点......
例如:
Observable<GameObject> obs1 = func1();
Observable<GameObject> obs2 = func2();
Observable<GameObject> obs3 = func3();
Observable<GameObject> obs3 = func4();
我想发出obs1和obs2,得到它们的结果,然后发出obs3然后obs4,然后以订阅结束链,同时可以访问obs1、obs2、obs3和obs4的结果。
调用的顺序很重要,我需要在执行obs3之前完成obs1和obs2。
obs3和obs4也是如此-我需要在执行obs4之前完成obs3。
我该怎么做呢?
我知道这是一个相当容易理解的问题,但当开发人员开始了解rxJava时,这是最有问题的问题之一。
谢了。
由于我不久前也有同样的疑问,因此问题接缝与可观察量如何真正工作有关。
假设您使用以下内容创建了obs1和obs2:可观察
你有两个独立和不相连的流。当它们中的每一个都应该做一些事情,如网络请求或一些密集的后台处理,这可能需要一些时间时,这就是您想要的。
现在,假设您想观察这两个结果并在它们准备好时从它们中发出一个值(您没有明确说过,但它会帮助您了解它是如何工作的)。在这种情况下,您可以使用
zipFor
运算符,它获取一对项目,第一个可观察的第一个项目和第二个可观察的第二个项目,将它们组合成一个项目,并将其发送给链中可能对其感兴趣的下一个项目。zipTo
从一个可观察的调用,并期望另一个可观察的作为参数被压缩。它还需要一个自定义函数,该函数知道如何压缩2个源项并从中创建一个新项。
Observable<CustomObject> obs3 = obs1.zipWith(obs2, new Func2<GameObject, GameObject, CustomObject>() {
@Override
public CustomObject call(GameObject firstItem, GameObject secondItem) {
return new CustomObject(firstItem, secondItem);
}
});
在这种情况下,
CustomObject
只是一个pojo。但它可能是另一个长期运行的任务,或者您需要对前两个可观察项目的结果执行的任何操作。
如果你想等待(或者,观察!)来自< code>obs3
的结果,您可以在末尾插入另一个可观察对象,它应该执行另一项处理。
Observable<FinalResult> obs4 = obs3.map(new Func1<CustomObject, FinalResult>() {
@Override
public FinalResult call(CustomObject customObject) {
return new FinalResult(customObject);
}
});
map
运算符将一个对象转换(或映射)为另一个对象。因此,您可以执行另一项处理或任何数据操作,并从中返回结果。或者您的FinalResult
可能是一个常规类,例如CustomObject
,只是保存对另一个GameObject
s…你命名它。
根据您创建可观察量的方式,它们可能尚未开始发出任何项目。到目前为止,您只是创建并插入数据管道。为了触发第一个任务并使项目在流中流动,您需要订阅它。
obs4.subscribe();
总而言之,您实际上没有一个变量沿着整个链传递。实际上,您在第一个可观察对象中创建了一个项,当它准备好时通知第二个,以此类推。此外,每个步骤(可观察对象)都会以某种方式转换数据。所以,你有一系列的变换。
RxJava遵循函数式方法,对数据应用高阶函数(map、zip、filter、reduce)。清楚这一点至关重要。此外,数据总是不可变的:你不会真的改变一个可观察的,或者改变你自己的对象。它创建它们的新实例,旧对象最终将被垃圾回收。所以< code>obs1.zip(...)不改变< code>obs1,它创建一个新的可观察实例,你可以把它赋给一个变量。
您还可以删除变量赋值(obs1、obs2、obs3等),然后将所有方法链接在一起。所有东西都是强类型的,因此编译器不允许您插入彼此不匹配的可观察值(一个的输出应该与下一个的输入匹配)。
我希望它能给你一些想法!
您可以使用可观察.zip
和简单的可观察性.map
/可观察.flatMap 来做到这一点
:
Observable.zip(obs1, obs2, (res1, res2) -> {
// do stuff with res1, res2
return obs3.flatMap(res3 -> {
// do stuff with res1, res2, res3
return obs4.flatMap(res4 -> {
// do stuff with res1, res2, res3, res4
return result;
});
});
});
这将强制您的日程安排要求:
> < li>
观察点1和2
可观察 3
可观察的4
我正在使用RxJava链接异步操作,我想向下游传递一些变量: 这似乎是一种常见的模式,但我找不到有关它的信息。
我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的
我希望能够删除警报,但前提是此警报未分配给用户。为此,我需要获取我的用户列表,并检查是否没有用户分配了此警报。我通过将2个请求与可观察对象链接,成功地实现了这一点,但有更好的方法实现吗?
我看到了很多关于让Rx延迟事件的每次发射的帖子:如何让RxJS可以观测到倒计时?,如何使用RxJava间隔运算符,在可观察项RxJava之间添加延迟,为发出的列表的每个项添加RxJava延迟,以及其他。 不过,我没有看到任何不同延迟的链接。 基本上,我有一个文本视图和一个字母列表,我想: 将文本设置为第一个字母 等待1500ms 将文本设置为null 等待500ms 将文本设置为第二个字母 等待1
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio
我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块