当前位置: 首页 > 知识库问答 >
问题:

使用flatMapCompletable的RxJava 2 Observable未完成

常朗
2023-03-14

我有一个observable,它发出项目并将其上传到服务器

代码如下:

repository
            .getItems()
            .doOnComplete(() -> Log.d(TAG, "No items left."))
            .flatMapCompletable(item ->
                    repository
                            .uploadItems(item)
                            .onErrorComplete()
                            .andThen(
                                    deleteTemporaryItem()
                                            .onErrorComplete()
                            )
            );

getItems方法逐个发出项目,然后完成,uploadItems方法将它们上载到服务器。问题是,当没有任何项目all chain onComplete事件正常运行,并且我的所有订阅者都获得此事件并继续它时,但当有一些项目并且所有项目都已上载到onComplete事件时,不会超过。doOnComplete(()-

有人能帮忙找出这种行为的原因吗?

提前感谢!

共有1个答案

屠德宇
2023-03-14

请看一下此示例:

我通过每个步骤传递项目,以便订阅将收到已处理的每个项目的通知。处理管道涉及上载和删除文件。

请尝试更改实现并发布输出日志。

@Test
void name() throws Exception {
    Flowable<Integer> completed_work = Flowable.just(1, 2, 3)
            .map(integer -> integer * 1000)
            .flatMapSingle(integer ->
                    Completable.fromAction(() -> {
                        Thread.sleep(integer);
                        // do upload stuff here
                    })
                            .doOnComplete(() -> System.out.println("Uploaded file ...."))
                            //.timeout(10, TimeUnit.SECONDS)
                            .retry(3)
                            .andThen(
                                    Completable.fromAction(() -> {
                                        // do delete stuff...
                                    })
                                            .retry(2)
                                            //.timeout(10, TimeUnit.SECONDS)
                                            .doOnComplete(() -> System.out.println("Deleted file ..."))
                            )
                            .toSingle(() -> integer)
            )
            .doOnComplete(() -> System.out.println("Completed work"));


    completed_work.test()
            .await()
            .assertResult(1000, 2000, 3000);
}
 类似资料:
  • 我经历了这个问题,也经历了这个问题。但他们都没有帮助。有人能让我知道我做错了什么吗?

  • 我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做

  • 问题内容: 由于某种原因,我完成后没有被调用。 我的班级干事: 我的onPostExecute(): 一切正常,我成功完成并返回一个布尔值,但随后就结束了。 谢谢 问题答案: 您是否在UI线程上创建了AsyncTask?还要在onPostExecute()方法上添加一个@Override注释,以确保正确声明了它。

  • 当我使用完全的未来。allOf()组合javadoc中描述的独立可完成的未来,在提供给该方法的所有未来之后,它不能可靠地完成。例如。: 结果如下: 我希望日志“Joined”和“Completed allOf”写在“Completed f1”和“Completed f2”之后。为了让事情变得更加混乱,阵列中的未来顺序似乎是头等大事。如果我换了台词 到 结果输出更改为: 更糟糕的是,如果我多次运行完

  • 我正在使用kafka和elasticsearch设置flink流处理器。我想重播我的数据,但当我将并行度设置为1以上时,它不会完成程序,我认为这是因为Kafka流只看到一条消息,将其标识为流的结尾。 有没有办法告诉flink消费群中的所有线程在一个线程完成后立即结束?

  • 我使用spring Boot2、jpa和Hibernate。Db是后置,我尝试删除一个带有子级的对象 如果我删除了Samplings,Samplings、testsamplings和Compressions也应该被删除。 从sample_letter=?的示例中删除和sampling_id=?和sampling_year=? 2018-10-03 22:21:05.832错误14511--[nio