当前位置: 首页 > 知识库问答 >
问题:

RxJava2 subscribe会在一段时间后停止观察,但在flowable完成后会继续

薛鹏飞
2023-03-14

我很难理解以下代码示例的行为;

    Flowable<String> f = Flowable.just(1)
            .flatMap(it -> Flowable.create(e -> {

                for(int i = 1; i < 1001; ++i) {
                    log.info("Emitting: " + i);
                    if(i % 10 == 0) {
                        Thread.sleep(1000);
                    }
                    e.onNext(i);
                }

                e.onComplete();
            }, BackpressureStrategy.BUFFER))
            .map(String::valueOf)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread());

    f.subscribe(val -> {
        Thread.sleep(100);
        log.info("Observing: " + val);
    });

    Thread.sleep(1000000);

代码正常工作,直到subscribe调用观察到128项。发射和观察是平行的。但在那之后,Flowable继续发射项目(显然在某处排队),但直到所有1000个项目发射完毕,才观察到任何项目。发射完所有1000个项目后,剩下的项目(

这看起来与128的背压缓冲区大小有关,但我仍然希望发射和观察在整个1000个项目中是平行的,因为观察者显然不比发射者慢。我有什么遗漏吗?我应该怎么做来修复代码?

共有1个答案

东郭海阳
2023-03-14

这是由于create和subscribeOn之间存在相同的池死锁造成的:

如果链中有create(FlowableOnSubscribe,BackPressureStragy)类型source up,建议使用subscribeOn(scheduler,false)来避免相同的池死锁,因为请求可能会堆积在急切/阻塞的发射器后面。

//...
.subscribeOn(Schedulers.io(), false)
//...

编辑:

我通过替换Flowable尝试了最初的示例(以及您建议的修复)。创建一个可流动的。但我没有遇到任何问题。你能举一个可能出现问题的例子吗?

Flowable.range(1, 10)
    .subscribeOn(Schedulers.io(), false)
    .doOnNext(v -> System.out.println(Thread.currentThread().getName()))
    .observeOn(Schedulers.single(), false, 1)
    .blockingSubscribe();

这将首先打印RxCachedThreadScheduler-1,然后打印RxSingleScheduler-19次,因为观察的补给请求将在单个调度程序上运行,而不是路由回io调度程序。请尝试使用SubbeOn true。

 类似资料:
  • 触发spring boot REST服务后,该服务可以正常运行数小时,所有请求都可以正常工作,没有任何问题。发生的是,一段时间后,它随机地停止了。在查看日志时,我没有发现任何错误,除了应用程序已被销毁的信息。 一段时间后的日志 Maven依赖项 对于为什么spring boot REST API可能会停止有什么想法吗?我的maven依赖关系是根据演示的--而且它正在成功运行--这就是为什么服务在随

  • 我正在尝试使用Spring批处理和Spring集成在SFTP服务器中上传多个文件。为此,我使用ThreadPoolTaskExector进行并行处理。 在每个进程中执行文件上传,但是即使所有的文件都在SFTP服务器上成功上传,仍然没有停止进程,程序总是保持运行状态。 即使我重写了JobExefftionListener

  • 我有4个Kafka和debezium一起运行。经过几天的良好运行后,三台kafka机器脱离网络一段时间,在< code > connect distributed . out 日志文件中,我收到了许多包含以下错误的消息: 我有4台Kafka机器,经纪人从0到3 动物园管理员: <代码>192.168.240.70 关注我的 - 除了之外,有相同的 指向安装 Kafka 的计算机的相同 IP,并且

  • 我有6个集装箱在码头群中运行。Kafka Zookeeper、MongoDB、A、B、C和接口。接口是来自公共的主要访问点-只有这个容器发布端口-5683。接口容器在启动期间连接到A、B和C。我使用docker组合文件docker堆栈部署,每个服务都有一个名称,用作接口的主机。一切都开始顺利,运转良好。过了一段时间(20分钟、1小时……),我无法向接口提出请求。接口接收到我的请求,但应用程序与服务

  • 问题内容: 在我的组织中,我们有许多Redis工作人员来完成我们的关键任务。通常,一天一次或两次,我们的工人会停止处理队列。 该代码基本上如下所示: 如果看到的话,就代码而言,发生的事情并不多,但是每隔一段时间,队列就会开始建立,并且工作程序不会从队列中弹出任何项目。为设置超时根本没有用,因为我们假设问题出在Redis客户端连接上。 目前,我们已经建立了一些侦听器,这些侦听器会在队列建立时提醒我们

  • 我想做的是允许我的应用程序在后台音频应用程序播放音频时使用说话。当我的应用程序正在讲话时,我希望后台应用程序的音频“变暗”,然后在我的应用程序结束讲话后返回其原始音量。 在我的类中,我初始化我设置的像这样: 每当我想说一个新的话语时,我都会做以下事情。我遵循了AVSpeechSynthesizer问题的建议,有解决办法吗?每次创建一个新的AVSpeechSynthesizer,以确保总是收到取消(