创建操作 - From

优质
小牛编辑
128浏览
2023-12-01

From

将其它种类的对象和数据类型转换为Observable

from

当你使用Observable时,如果你要处理的数据都可以转换成展现为Observables,而不是需要混合使用Observables和其它类型的数据,会非常方便。这让你在数据流的整个生命周期中,可以使用一组统一的操作符来管理它们。

例如,Iterable可以看成是同步的Observable;Future,可以看成是总是只发射单个数据的Observable。通过显式地将那些数据转换为Observables,你可以像使用Observable一样与它们交互。

因此,大部分ReactiveX实现都提供了将语言特定的对象和数据结构转换为Observables的方法。

from

在RxJava中,from操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

示例代码

  1. Integer[] items = { 0, 1, 2, 3, 4, 5 };
  2. Observable myObservable = Observable.from(items);
  3. myObservable.subscribe(
  4. new Action1<Integer>() {
  5. @Override
  6. public void call(Integer item) {
  7. System.out.println(item);
  8. }
  9. },
  10. new Action1<Throwable>() {
  11. @Override
  12. public void call(Throwable error) {
  13. System.out.println("Error encountered: " + error.getMessage());
  14. }
  15. },
  16. new Action0() {
  17. @Override
  18. public void call() {
  19. System.out.println("Sequence complete");
  20. }
  21. }
  22. );

输出

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. 5
  7. Sequence complete

对于Future,它会发射Future.get()方法返回的单个数据。from方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位。如果过了指定的时长Future还没有返回一个值,这个Observable会发射错误通知并终止。

from默认不在任何特定的调度器上执行。然而你可以将Scheduler作为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future。

RxJavaAsyncUtil

from func)

此外,在可选包 RxJavaAsyncUtil 中,你还可以用下面这些操作符将actions,callables,functions和runnables转换为发射这些动作的执行结果的Observable:

  • fromAction
  • fromCallable
  • fromFunc0
  • fromRunnable

在这个页面 Start 查看关于这些操作符的更多信息。

from

注意:还有一个可选的StringObservable类中也有一个from方法,它将一个字符流或者一个REader转换为一个发射字节数组或字符串的Observable。

runAsync2

注意:这里与后面start操作符里的runAsync说明重复了

在单独的RxJavaAsyncUtil包中(默认不包含在RxJava中),还有一个runAsync函数。传递一个Action和一个SchedulerrunAsync,它会返回一个StoppableObservable,这个Observable使用Action产生发射的数据项。

传递一个Action和一个SchedulerrunAsync,它返回一个使用这个Action产生数据的StoppableObservable。这个Action接受一个Observable和一个Subscription作为参数,它使用Subscription检查unsubscribed条件,一旦发现条件为真就立即停止发射数据。在任何时候你都可以使用unsubscribe方法手动停止一个StoppableObservable(这会同时取消订阅与这个StoppableObservable关联的Subscription)。

由于runAsync会立即调用Action并开始发射数据,在你创建StoppableObservable之后到你的观察者准备好接受数据之前这段时间里,可能会有一部分数据会丢失。如果这不符合你的要求,可以使用runAsync的一个变体,它也接受一个Subject参数,传递一个ReplaySubject给它,你可以获取其它丢失的数据了。

decode

decode

StringObservable类不是默认RxJava的一部分,包含一个decode操作符,这个操作符将一个多字节字符流转换为一个发射字节数组的Observable,这些字节数组按照字符的边界划分。