当前位置: 首页 > 知识库问答 >
问题:

使用RxJava将多个项目从可观察到的对象组合到新对象

符功
2023-03-14

我正在尝试实现某种流式解析器。假设我有整数的stream,我将它们组合起来创建新的object,它聚合了stream的一部分。

例如,当integer为负数时,object为“done”。为了保持简单,生成的项目将是一串数字。

下面是一个简单的例子:

Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"
rx.Observable<Integer> src = rx.Observable
    .from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));

rx.Observable<String> res = src
    .lift(new rx.Observable.Operator<String, Integer> () {

        @Override
        public rx.Subscriber<? super Integer> call(
            final rx.Subscriber<? super String> subscriber) {

            return new rx.Subscriber<Integer>(subscriber) {

                private final StringBuilder cur = new StringBuilder();

                @Override
                public void onCompleted() {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(e);
                    }
                }

                @Override
                public void onNext(Integer i) {
                    if (subscriber.isUnsubscribed())
                        return;

                    cur.append(i).append('');
                    if (i < 0) {
                        subscriber.onNext(cur.toString());
                        cur.delete(0, cur.length());
                    }
                }
            };
        }
    });

rx.observers.TestSubscriber<String> sub =
    new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();
rx.Observable<String> res2 = res.take(2);

看起来每个输入onNext都应该调用output onNext。

有什么想法吗?

共有1个答案

哈朗
2023-03-14

嗯。写长篇文章确实有助于带来新的想法,而且很管用。(我撞上墙已经两天了)。

生成observer 并发出observer.empty()直到对象就绪。flatmap之后。

下面是工作示例:

rx.Observable<rx.Observable<String>> res = src
    .lift(new rx.Observable.Operator<rx.Observable<String>, Integer> () {

        @Override
        public rx.Subscriber<? super Integer> call(
            final rx.Subscriber<? super rx.Observable<String>> subscriber) {

            return new rx.Subscriber<Integer>(subscriber) {

                private final StringBuilder cur = new StringBuilder();

                // onError, onCompleted skipped

                @Override
                public void onNext(Integer i) {
                    rx.Observable<String> out;
                    cur.append(i);
                    if (i < 0) {
                        out = rx.Observable.just(cur.toString());
                        cur.delete(0, cur.length());
                    } else {
                        out = rx.Observable.<String>empty();
                    }
                    subscriber.onNext(out);
                }
            };
        }
    });

rx.Observable<String> res2 = res
    .flatMap(stringObservable -> stringObservable)
    .take(2);
 类似资料:
  • 我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的

  • 我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。 需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。 问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成

  • 我有两个可观察到的。它们都是可观察的类型 一种是冷的,称为初始值可观察(initialValueObservable),它通过可观察(Observable)从项目列表中发出。from()。 另一个是名为“valueUpdateObservable”的热门主题,它是一个发布主题,在出现新项目时通知订阅者。 在客户端中,我想同时订阅这两个,因此我从和发布的更新中获取初始值。我最初的方法是合并它们,但我

  • 我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?

  • 我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然

  • 我正在开发一个功能,我需要根据本地数据库数据过滤掉网络响应数据。 例如,我的网络层返回一个项目列表,我的数据库层返回一个可观察的ID列表。现在,我只想从网络层返回那些id与数据库层响应中的任何人都匹配的对象。 下面的代码从网络获取数据并将结果保存到数据库(缓存)。 我还有一个方法可以返回需要过滤的场馆列表 现在,我如何扩展前面的链,使用getDislikedVinces()observate从网络