我有一个observable,它发出项目并将其上传到服务器。
代码如下:
repository
.getItems()
.doOnComplete(() -> Log.d(TAG, "No items left."))
.flatMapCompletable(item ->
repository
.uploadItems(item)
.onErrorComplete()
.andThen(
deleteTemporaryItem()
.onErrorComplete()
)
);
getItems方法逐个发出项目,然后完成,uploadItems方法将它们上载到服务器。问题是,当没有任何项目all chain onComplete事件正常运行,并且我的所有订阅者都获得此事件并继续它时,但当有一些项目并且所有项目都已上载到onComplete事件时,不会超过。doOnComplete(()-
有人能帮忙找出这种行为的原因吗?
提前感谢!
请看一下此示例:
我通过每个步骤传递项目,以便订阅将收到已处理的每个项目的通知。处理管道涉及上载和删除文件。
请尝试更改实现并发布输出日志。
@Test
void name() throws Exception {
Flowable<Integer> completed_work = Flowable.just(1, 2, 3)
.map(integer -> integer * 1000)
.flatMapSingle(integer ->
Completable.fromAction(() -> {
Thread.sleep(integer);
// do upload stuff here
})
.doOnComplete(() -> System.out.println("Uploaded file ...."))
//.timeout(10, TimeUnit.SECONDS)
.retry(3)
.andThen(
Completable.fromAction(() -> {
// do delete stuff...
})
.retry(2)
//.timeout(10, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Deleted file ..."))
)
.toSingle(() -> integer)
)
.doOnComplete(() -> System.out.println("Completed work"));
completed_work.test()
.await()
.assertResult(1000, 2000, 3000);
}
我经历了这个问题,也经历了这个问题。但他们都没有帮助。有人能让我知道我做错了什么吗?
我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做
问题内容: 由于某种原因,我完成后没有被调用。 我的班级干事: 我的onPostExecute(): 一切正常,我成功完成并返回一个布尔值,但随后就结束了。 谢谢 问题答案: 您是否在UI线程上创建了AsyncTask?还要在onPostExecute()方法上添加一个@Override注释,以确保正确声明了它。
当我使用完全的未来。allOf()组合javadoc中描述的独立可完成的未来,在提供给该方法的所有未来之后,它不能可靠地完成。例如。: 结果如下: 我希望日志“Joined”和“Completed allOf”写在“Completed f1”和“Completed f2”之后。为了让事情变得更加混乱,阵列中的未来顺序似乎是头等大事。如果我换了台词 到 结果输出更改为: 更糟糕的是,如果我多次运行完
我正在使用kafka和elasticsearch设置flink流处理器。我想重播我的数据,但当我将并行度设置为1以上时,它不会完成程序,我认为这是因为Kafka流只看到一条消息,将其标识为流的结尾。 有没有办法告诉flink消费群中的所有线程在一个线程完成后立即结束?
我使用spring Boot2、jpa和Hibernate。Db是后置,我尝试删除一个带有子级的对象 如果我删除了Samplings,Samplings、testsamplings和Compressions也应该被删除。 从sample_letter=?的示例中删除和sampling_id=?和sampling_year=? 2018-10-03 22:21:05.832错误14511--[nio