我正在尝试实现某种流式解析器。假设我有整数的stream,我将它们组合起来创建新的object,它聚合了stream的一部分。
例如,当integer为负数时,object为“done”。为了保持简单,生成的项目将是一串数字。
下面是一个简单的例子:
Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"
rx.Observable<Integer> src = rx.Observable
.from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));
rx.Observable<String> res = src
.lift(new rx.Observable.Operator<String, Integer> () {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super String> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(Integer i) {
if (subscriber.isUnsubscribed())
return;
cur.append(i).append('');
if (i < 0) {
subscriber.onNext(cur.toString());
cur.delete(0, cur.length());
}
}
};
}
});
rx.observers.TestSubscriber<String> sub =
new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();
rx.Observable<String> res2 = res.take(2);
看起来每个输入onNext都应该调用output onNext。
有什么想法吗?
嗯。写长篇文章确实有助于带来新的想法,而且很管用。(我撞上墙已经两天了)。
生成observer
并发出observer.empty()
直到对象就绪。flatmap
之后。
下面是工作示例:
rx.Observable<rx.Observable<String>> res = src
.lift(new rx.Observable.Operator<rx.Observable<String>, Integer> () {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super rx.Observable<String>> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
// onError, onCompleted skipped
@Override
public void onNext(Integer i) {
rx.Observable<String> out;
cur.append(i);
if (i < 0) {
out = rx.Observable.just(cur.toString());
cur.delete(0, cur.length());
} else {
out = rx.Observable.<String>empty();
}
subscriber.onNext(out);
}
};
}
});
rx.Observable<String> res2 = res
.flatMap(stringObservable -> stringObservable)
.take(2);
我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的
我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。 需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。 问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成
我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?
我有两个可观察到的。它们都是可观察的类型 一种是冷的,称为初始值可观察(initialValueObservable),它通过可观察(Observable)从项目列表中发出。from()。 另一个是名为“valueUpdateObservable”的热门主题,它是一个发布主题,在出现新项目时通知订阅者。 在客户端中,我想同时订阅这两个,因此我从和发布的更新中获取初始值。我最初的方法是合并它们,但我
我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然
我正在开发一个功能,我需要根据本地数据库数据过滤掉网络响应数据。 例如,我的网络层返回一个项目列表,我的数据库层返回一个可观察的ID列表。现在,我只想从网络层返回那些id与数据库层响应中的任何人都匹配的对象。 下面的代码从网络获取数据并将结果保存到数据库(缓存)。 我还有一个方法可以返回需要过滤的场馆列表 现在,我如何扩展前面的链,使用getDislikedVinces()observate从网络