我以以下方式使用rxjava进行维护任务:
在需要定期维护的类中,我使用以下静态订阅,这会导致在将类加载到内存中时首次启动可观察,然后按照指定的时间间隔定期启动。
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe();
现在我遇到了一种情况,我想向UI报告我的维护结果。
通常,我会使用以下模式
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
// some code
subscriber.onNext(result);
subscriber.onCompleted(); }
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// write to the UI
}
});
但是这里我们有一个可观察的,它只执行一次。
对于一个定期执行的可观察的,我找不到在订阅者中调用Action的方法,这样我就可以使用subscriber.onNext()传递结果。看起来可观察的没有合适的签名,它可能会花费定时器()的很长时间,同时允许订阅动作。但是知道rxjava,我肯定有一个技巧;-)
我可以使用zip来压缩Timer可观察和一次性可观察(基本上将两个版本压缩在一起),但我宁愿使用第一个结构,因为它的行为略有不同。
--
我尝试通过以下方式将两个版本合并为一个:
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code // stays here to ensure there is no concurrency while executing
final String result = "result"; // I store the result in a final variable after some code has been finished
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result); // then I use it in a new Observable and emit it
subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
}
});
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// so I can finally consume the result on the UI thread
}
});
我创建了一个允许我向订阅者发送结果的对象,而不是创建并发出一个“null”可观察对象。
很乱,但这应该行得通,对吧?有更简单的解决方案吗?你有什么想法?
我知道这是很久以前才给这个问题添加答案的。然而,它可能会帮助其他人。据我所知,您希望创建一个observable并运行它,例如每3秒运行一次。因此,请在下面找到我的示例(您可以发出一个现成的操作符,如Observable.just,甚至可以使用Observable.create操作符创建您自己的Observable):
Observable.interval(3,TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
return Observable.just("Hello " + Thread.currentThread().getName()+aLong);
/*
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try{
String data = fetchData("http://www.yahoo.com");
emitter.onNext(data.length()+"");
emitter.onComplete();
}catch (Exception e){
emitter.onError(e);
}
}
});
*/
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("RxResult","-- " + Thread.currentThread().getName() + " -- " +s);
}
});
public String fetchData(String url) throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
String inputLine;
String result = "";
while((inputLine = in.readLine()) != null){
result += inputLine;
}
return result.length()+"";
}
我不是舒尔,无法理解您的问题。
您想从您的可观察计时器和其他可观察计时器使用相同的订阅吗?
也许你可以用一个主题作为你的观察对象之间的桥梁。
Subject<?, ?> bridge = PublishSubject.create();
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */);
Observable.create(/* subscription */).subscribe(bridge);
你的“一次性可观测”发出的每一项都会被推到brige中,brige会将其推到“计时器”可观测中。
这就是你要找的吗?
有人能向我解释一下为什么运算符可以接受返回或的函数吗? 官方文件说: FlatMap运算符通过将您指定的函数应用于源可观察对象发出的每个项目来转换可观察对象,其中该函数返回本身发出项目的可观察对象。 为什么它也可以返回数组? 例如,它们都是有效的: 但这不起作用:
假设我有一个返回列表的博客帖子api 从列表创建可观察 将每个可观察拆分为
我是RxJava的新手,我知道flatmaps用于将发出的项映射到可观察项。我还知道,基于文档,发出的可观测数据都被合并(扁平化)为一个单一的可观测数据流。 我在想,如果这些内在的可观察到的东西都完成了,会发生什么? 例如:我有一个可观察的,它发出一个项数据键。我必须进行另一个异步http调用才能从服务器获取项数据,所以我使用另一个Observable调用它。我使用一个平面地图来连接这两个,并创建
我来自同步编程背景,我很难理解可观察性。 这是我的服务/提供商的摘录(离子2项目) 我将从订阅它。关于这一点,我有几个问题。 > 即使我没有声明,上面的代码是否返回一个可观察/观察者? 响应是JSON。如何检查/处理JSON并执行一些操作,如 那就做吧 我认为应该在提供者类中完成。只是一个典型的提示/例子将是真棒。 当请求到达subscribe方法时,它是否真的发生了? 创建和返回Angular
我正在使用mat-table,并试图将MatTableDataSource与observable一起使用(我从web服务获取数据),但我不知道如何将MatTableDataSource配置为使用observable而不是数组。 这个问题的唯一解决方案是订阅ngOnInit方法中的observable并在新数据到达时始终创建新的MatTableDataSource吗?
问题:我有一个功能,用户可以输入一个查询字符串,我创建了两个可观察对象,一个用于查询本地DB,另一个用于从API获取结果。这两个操作必须并行运行。我需要尽快显示DB中的结果,当API结果返回时,我需要进行检查以删除本地结果中的重复项。 我的方法:CombineTest似乎是最接近我需要的东西。但问题是,只有当两个可见光都发出结果时,它才会发出。我想我要找的是CombineLatest和Concat