当前位置: 首页 > 面试题库 >

当我在FlowableOnSubscribe类中调用onNext时,订户的onNext和onComplete函数无法运行

欧金鹏
2023-03-14
问题内容

在使用RxJava 2的Android项目中,我FlowableonCreate初始活动中创建了一个类似的代码:

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());

FlowableOnSubscribe的实现是:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}

这是订户实现:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}

动作实现是:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}

在我的输出中,我期望从发出一条日志语句onNext,但没有看到一条。相反,这是我的整个输出:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run

这表明onNext从不运行,onComplete甚至都不运行。但是MyAction运行成功。

当我注释掉对以下内容的调用时,会发生以下情况onNext

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete

在这种情况下onNext,当然不会运行,但至少会onComplete运行。

我希望onComplete在两种情况下都能看到运行,并onNext在致电时运行emitter.onNext。我在这里做错了什么?


问题答案:

您需要手动发出请求,否则Subscriber直接扩展时将不会发出任何数据:

@Override
public void onSubscribe(Subscription s) {
    Log.i(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
}

或者,您可以扩展DisposableSubscriberResourceSubscriber



 类似资料:
  • 我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。 在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是

  • 所以我有一个。我想知道每个下游订户调用何时发出,这既是为了测量每个下游订户花费的时间,也是为了反压。 让我半途而废--我可以用自己的包装每个单独的订阅服务器,如下面的示例所示。当所有下游订户完成下一个调用时,它不会通知我,而我自己也不必做一些记账。 提前感谢!

  • 不幸的是,这种情况在生产中偶尔发生,但我无法可靠地复制。 gRPC服务器向少数客户端发送小而频繁的更新。每个客户端使用不同的参数对同一个调用发出多个请求。永远不会有来自服务器的数据流。 调用onNext时,出现以下错误: 根据gRPC订阅数据的请求,保留观察员名单。多个客户端可能会请求完全相同的更新,因此每个进入的客户端都会被添加到相应数据的列表中。 如果他们取消,他们将通过以下方式从列表中删除:

  • 我是RxJava新手。我想从给定集合中下载每个TempoAccount实体的一些数据,并将其存储在map accountsWithProjects中。当上一个onNext(TempoAccount TempoAccount)的代码完成时,我想调用filterAccountsWithProjects(accountsWithProjects)方法。有什么简单的方法可以实现吗? 问题:在上面的代码中,

  • 例: 注意:是非Android 运行环境, 使用的是RxJava2.x

  • 我正在用RxJava实现一个事件总线(RxBus)。 RxBus。Java语言 从RecyclerView的viewHolder发布事件 从片段订阅事件 问题是:当我点击或时,只被调用一次(这是正确的),但是被多次调用(甚至不是常量时间)。请参阅下面的日志。 我怎样才能让它只打一次电话? 编辑:我发现如果这次被调用N次,下次当我单击该项目时将被调用N 1次。似乎可观察对象正在向订阅者发出历史记录中