在使用RxJava 2的Android项目中,我Flowable
在onCreate
初始活动中创建了一个类似的代码:
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
FlowableOnSubscribe的实现是:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
这是订户实现:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
动作实现是:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
在我的输出中,我期望从发出一条日志语句onNext
,但没有看到一条。相反,这是我的整个输出:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
这表明onNext
从不运行,onComplete
甚至都不运行。但是MyAction
运行成功。
当我注释掉对以下内容的调用时,会发生以下情况onNext
:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
在这种情况下onNext
,当然不会运行,但至少会onComplete
运行。
我希望onComplete
在两种情况下都能看到运行,并onNext
在致电时运行emitter.onNext
。我在这里做错了什么?
您需要手动发出请求,否则Subscriber
直接扩展时将不会发出任何数据:
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
或者,您可以扩展DisposableSubscriber
或ResourceSubscriber
。
我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。 在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是
所以我有一个。我想知道每个下游订户调用何时发出,这既是为了测量每个下游订户花费的时间,也是为了反压。 让我半途而废--我可以用自己的包装每个单独的订阅服务器,如下面的示例所示。当所有下游订户完成下一个调用时,它不会通知我,而我自己也不必做一些记账。 提前感谢!
不幸的是,这种情况在生产中偶尔发生,但我无法可靠地复制。 gRPC服务器向少数客户端发送小而频繁的更新。每个客户端使用不同的参数对同一个调用发出多个请求。永远不会有来自服务器的数据流。 调用onNext时,出现以下错误: 根据gRPC订阅数据的请求,保留观察员名单。多个客户端可能会请求完全相同的更新,因此每个进入的客户端都会被添加到相应数据的列表中。 如果他们取消,他们将通过以下方式从列表中删除:
我是RxJava新手。我想从给定集合中下载每个TempoAccount实体的一些数据,并将其存储在map accountsWithProjects中。当上一个onNext(TempoAccount TempoAccount)的代码完成时,我想调用filterAccountsWithProjects(accountsWithProjects)方法。有什么简单的方法可以实现吗? 问题:在上面的代码中,
例: 注意:是非Android 运行环境, 使用的是RxJava2.x
我正在用RxJava实现一个事件总线(RxBus)。 RxBus。Java语言 从RecyclerView的viewHolder发布事件 从片段订阅事件 问题是:当我点击或时,只被调用一次(这是正确的),但是被多次调用(甚至不是常量时间)。请参阅下面的日志。 我怎样才能让它只打一次电话? 编辑:我发现如果这次被调用N次,下次当我单击该项目时将被调用N 1次。似乎可观察对象正在向订阅者发出历史记录中