当前位置: 首页 > 知识库问答 >
问题:

缓冲区为空时如何停止PublishSubject的发射

慕容嘉荣
2023-03-14

我正在尝试使用RxJava创建一个顺序下载服务。用户可以批量添加项目(20、30等)或单个项目。这些项目将被添加到队列中,然后以10个批次的顺序下载。为此,我使用PublishSubject:

PublishSubject<Int> pubSubject = PublishSubject.create();

它发出用户添加的项(ID),然后将缓冲区操作符应用于批处理项。使用这些ID,可以在flatMap中下载项目,并在订阅的onNext中返回。

  pubSubject.buffer(1, TimeUnit.SECONDS, 10)
            .observeOn(Schedulers.io())
            .flatMap { idsBatch -> downloadByIds(idsBatch) }
            .subscribe(
                /* onNext */ { apiResponse -> handleResponse() },
                /* onError */ { handleError(it) },
                /* onComplete*/ { hideProgressBar() }
             )

代码大部分按预期工作。项目已成功批处理并下载,但即使在发出所有项目后,buffer仍会使用空列表调用flatMap,并且永远不会调用onComplete()。

我想知道当缓冲区中没有更多项目时,RxJava中是否有任何方法或方法可以获得on完成回调。因为否则我的下载服务永远不会终止。

共有1个答案

易弘阔
2023-03-14

您可以使用takeWhile操作:

返回一个可观察的,只要每个项目满足指定条件,它就会发出源可观察的发出的项目,然后在不满足此条件时立即完成。

pubSubject.buffer(1, TimeUnit.SECONDS, 10)
          .observeOn(Schedulers.io())
          .takeWhile { idsBatch -> idsBatch.isNotEmpty() }
          .flatMap { idsBatch -> downloadByIds(idsBatch) }
          .subscribe(
              /* onNext */ { apiResponse -> handleResponse() },
              /* onError */ { handleError(it) },
              /* onComplete*/ { hideProgressBar() }
           )
 类似资料:
  • 我有一个如下定义的原型模式, 现在,根据proto3官方文档,默认值不会序列化以在有线传输期间节省空间。但在我的情况下,我想接收客户端是否已显式设置字段

  • 问题内容: 在编写用于OpenGL库的Matrix类时,我遇到了一个问题,即使用Java数组还是使用Buffer策略存储数据(JOGL为Matrix操作提供直接缓冲区复制)。为了对此进行分析,我编写了一个小型性能测试程序,该程序比较了Arrays vs Buffers和Direct Buffers上循环和批量操作的相对速度。 我想在这里与您分享我的结果(因为我发现它们很有趣)。请随时发表评论和/或

  • 问题内容: 我在linux上有一个Java应用程序,它可以打开UDP套接字并等待消息。 在高负载下运行了几个小时之后,有一个数据包丢失,即数据包被内核接收,但不是由我的应用程序接收(我们在嗅探器中看到丢失的数据包,在netstat中看到UDP数据包丢失,我们没有看到这些数据包)在我们的应用日志中)。 我们尝试扩大套接字缓冲区,但这并没有帮助-我们早些时候就开始丢失数据包,仅此而已。 对于调试,我想

  • 问题内容: 当我从互联网上加载视频(10-40MB大)时,无法提供流畅的播放体验。 我的AVPlayer要么加载整个视频然后播放,要么播放1s,缓冲然后停止播放。 我尝试了无尽的库,缓冲区观察者方法和教程。似乎没有任何帮助。 问题答案: 从 iOS 10.x开始 ,您可以进行一些缓冲设置,例如,您可以决定缓冲视频需要多少秒:

  • 我有一个我想要定义的方法,叫做FindAll,它不需要参数。普罗托克在抱怨。 应为类型名。 这是针对行: rpc findAll()返回(BenchmarksList);

  • 我已经在网上搜索了几天这种现象,我可以找到一个类似问题的片段,这表明可能与底层的InputStreamReader和/或StreamDecoder有关,但这开始超出了我的专业知识。(见本链接) 所以我的问题是,我是否正确地实现了BufferedReader,以及如何解决我所看到的问题,以便在没有不必要的延迟的情况下获得每一行。