RxJava文档和教程 - RxJava使用示例


你可以在这里找到JVM平台几种语言的例子 language adaptor:



  1. public static void hello(String... names) {
  2. Observable.from(names).subscribe(new Action1<String>() {
  3. @Override
  4. public void call(String s) {
  5. System.out.println("Hello " + s + "!");
  6. }
  7. });
  8. }
  1. hello("Ben", "George");
  2. Hello Ben!
  3. Hello George!


  1. def hello(String[] names) {
  2. Observable.from(names).subscribe { println "Hello ${it}!" }
  3. }
  1. hello("Ben", "George")
  2. Hello Ben!
  3. Hello George!


  1. (defn hello
  2. [&rest]
  3. (-> (Observable/from &rest)
  4. (.subscribe #(println (str "Hello " % "!")))))
  1. (hello ["Ben" "George"])
  2. Hello Ben!
  3. Hello George!


  1. import rx.lang.scala.Observable
  2. def hello(names: String*) {
  3. Observable.from(names) subscribe { n =>
  4. println(s"Hello $n!")
  5. }
  6. }
  1. hello("Ben", "George")
  2. Hello Ben!
  3. Hello George!




要创建Observable,你可以手动实现Observable的行为,也可以传递一个函数给create( ),还可以使用这些 创建操作符 将一个已有的数据结构转换为Observable。


你可以使用just( )from( ) 方法将对象,列表,对象属性转换为发射那些对象的Observable:

  1. Observable<String> o = Observable.from("a", "b", "c");
  2. def list = [5, 6, 7, 8]
  3. Observable<Integer> o = Observable.from(list);
  4. Observable<String> o = Observable.just("one object");


使用create( )创建一个Observable

使用 create( ) 方法,你可以创建你自己的Observable,可以实现异步I/O,计算操作,甚至是无限的数据流。


  1. /**
  2. * 这个例子展示了一个自定义的Observable,当有订阅时他会阻塞当前线程。
  3. */
  4. def customObservableBlocking() {
  5. return Observable.create { aSubscriber ->
  6. 50.times { i ->
  7. if (!aSubscriber.unsubscribed) {
  8. aSubscriber.onNext("value_${i}")
  9. }
  10. }
  11. // after sending all values we complete the sequence
  12. if (!aSubscriber.unsubscribed) {
  13. aSubscriber.onCompleted()
  14. }
  15. }
  16. }
  17. // To see output:
  18. customObservableBlocking().subscribe { println(it) }


The following example uses Groovy to create an Observable that emits 75 strings.


  1. /**
  2. * This example shows a custom Observable that does not block
  3. * when subscribed to as it spawns a separate thread.
  4. */
  5. def customObservableNonBlocking() {
  6. return Observable.create({ subscriber ->
  7. Thread.start {
  8. for (i in 0..<75) {
  9. if (subscriber.unsubscribed) {
  10. return
  11. }
  12. subscriber.onNext("value_${i}")
  13. }
  14. // after sending all values we complete the sequence
  15. if (!subscriber.unsubscribed) {
  16. subscriber.onCompleted()
  17. }
  18. }
  19. } as Observable.OnSubscribe)
  20. }
  21. // To see output:
  22. customObservableNonBlocking().subscribe { println(it) }


  1. (defn customObservableNonBlocking []
  2. "This example shows a custom Observable that does not block
  3. when subscribed to as it spawns a separate thread.
  4. returns Observable<String>"
  5. (Observable/create
  6. (fn [subscriber]
  7. (let [f (future
  8. (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
  9. ; after sending all values we complete the sequence
  10. (-> subscriber .onCompleted))
  11. ))
  12. ))
  1. ; To see output
  2. (.subscribe (customObservableNonBlocking) #(println %))


  1. (defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  2. "Fetch a list of Wikipedia articles asynchronously.
  3. return Observable<String> of HTML"
  4. (Observable/create
  5. (fn [subscriber]
  6. (let [f (future
  7. (doseq [articleName wikipediaArticleNames]
  8. (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
  9. ; after sending response to onnext we complete the sequence
  10. (-> subscriber .onCompleted))
  11. ))))
  1. (-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
  2. (.subscribe #(println "--- Article ---n" (subs (:body %) 0 125) "...")))


  1. /*
  2. * Fetch a list of Wikipedia articles asynchronously.
  3. */
  4. def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
  5. return Observable.create { subscriber ->
  6. Thread.start {
  7. for (articleName in wikipediaArticleNames) {
  8. if (subscriber.unsubscribed) {
  9. return
  10. }
  11. subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
  12. }
  13. if (!subscriber.unsubscribed) {
  14. subscriber.onCompleted()
  15. }
  16. }
  17. return subscriber
  18. }
  19. }
  20. fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
  21. .subscribe { println "--- Article ---n${it.substring(0, 125)}" }


  1. --- Article ---
  2. <!DOCTYPE html>
  3. <html lang="en" dir="ltr" class="client-nojs">
  4. <head>
  5. <title>Tiger - Wikipedia, the free encyclopedia</title> ...
  6. --- Article ---
  7. <!DOCTYPE html>
  8. <html lang="en" dir="ltr" class="client-nojs">
  9. <head>
  10. <title>Elephant - Wikipedia, the free encyclopedia</tit ...

Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.

More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.


更多的信息可以在这里找到:ObservableCreating Observables



The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these (skip(10)), then takes the next 5 (take(5)), and transforms them (map(...)) before subscribing and printing the items:


  1. /**
  2. * Asynchronously calls 'customObservableNonBlocking' and defines
  3. * a chain of operators to apply to the callback sequence.
  4. */
  5. def simpleComposition() {
  6. customObservableNonBlocking().skip(10).take(5)
  7. .map({ stringValue -> return stringValue + "_xform"})
  8. .subscribe({ println "onNext => " + it})
  9. }


  1. onNext => value_10_xform
  2. onNext => value_11_xform
  3. onNext => value_12_xform
  4. onNext => value_13_xform
  5. onNext => value_14_xform


RxJava使用示例 - 图1


  1. (defn getVideoForUser [userId videoId]
  2. "Get video metadata for a given userId
  3. - video metadata
  4. - video bookmark position
  5. - user data
  6. return Observable<Map>"
  7. (let [user-observable (-> (getUser userId)
  8. (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
  9. bookmark-observable (-> (getVideoBookmark userId videoId)
  10. (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
  11. ; getVideoMetadata requires :language from user-observable so nest inside map function
  12. video-metadata-observable (-> user-observable
  13. (.mapMany
  14. ; fetch metadata after a response from user-observable is received
  15. (fn [user-map]
  16. (getVideoMetadata videoId (:language user-map)))))]
  17. ; now combine 3 observables using zip
  18. (-> (Observable/zip bookmark-observable video-metadata-observable user-observable
  19. (fn [bookmark-map metadata-map user-map]
  20. {:bookmark-map bookmark-map
  21. :metadata-map metadata-map
  22. :user-map user-map}))
  23. ; and transform into a single response object
  24. (.map (fn [data]
  25. {:video-id videoId
  26. :video-metadata (:metadata-map data)
  27. :user-id userId
  28. :language (:language (:user-map data))
  29. :bookmark (:viewed-position (:bookmark-map data))
  30. })))))


  1. {:video-id 78965,
  2. :video-metadata {:video-id 78965, :title House of Cards: Episode 1,
  3. :director David Fincher, :duration 3365},
  4. :user-id 12345, :language es-us, :bookmark 0}


RxJava使用示例 - 图2

The following example, in Groovy, comes from Ben Christensen’s QCon presentation on the evolution of the Netflix API. It combines two Observables with the merge operator, then uses the reduce operator to construct a single item out of the resulting sequence, then transforms that item with map before emitting it:

下面的例子使用Groovy,来自这里 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用merge操作结合两个Observables,使用reduce操作符从结果序列构建一个单独的结果数据项,然后在发射之前,使用map()变换那个结果。

  1. public Observable getVideoSummary(APIVideo video) {
  2. def seed = [id:video.id, title:video.getTitle()];
  3. def bookmarkObservable = getBookmark(video);
  4. def artworkObservable = getArtworkImageUrl(video);
  5. return( Observable.merge(bookmarkObservable, artworkObservable)
  6. .reduce(seed, { aggregate, current -> aggregate << current })
  7. .map({ [(video.id.toString() : it] }))
  8. }


RxJava使用示例 - 图3



  1. /*
  2. * Fetch a list of Wikipedia articles asynchronously, with error handling.
  3. */
  4. def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
  5. return Observable.create({ subscriber ->
  6. Thread.start {
  7. try {
  8. for (articleName in wikipediaArticleNames) {
  9. if (true == subscriber.isUnsubscribed()) {
  10. return;
  11. }
  12. subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
  13. }
  14. if (false == subscriber.isUnsubscribed()) {
  15. subscriber.onCompleted();
  16. }
  17. } catch(Throwable t) {
  18. if (false == subscriber.isUnsubscribed()) {
  19. subscriber.onError(t);
  20. }
  21. }
  22. return (subscriber);
  23. }
  24. });
  25. }

下面的例子使用Groovy,注意错误发生时现在是如何调用onError(Throwable t)的,下面的代码传递给subscribe()第二个方法用户处理onError通知:

  1. fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
  2. .subscribe(
  3. { println "--- Article ---n" + it.substring(0, 125) },
  4. { println "--- Error ---n" + it.getMessage() })

查看 错误处理操作符 这一页了解更多RxJava中的错误处理技术,包括使用 onErrorResumeNext()onErrorReturn()等方法,它们让你可以从错误中恢复。


  1. myModifiedObservable = myObservable.onErrorResumeNext({ t ->
  2. Throwable myThrowable = myCustomizedThrowableCreator(t);
  3. return (Observable.error(myThrowable));
  4. });