当前位置: 首页 > 知识库问答 >
问题:

concat映射后RxJava更改线程

秦涵涤
2023-03-14

你好,RxJava大师,

在我当前的Android项目中,我在使用RxJava和SQLite时遇到了一些死锁问题。我的问题是:

  1. 我在一个线程上启动一个事务
  2. 调用Web服务并将一些东西保存在数据库中
  3. conat映射另一个可观察函数
  4. 尝试在数据库上编写其他东西---

这是我的代码:

//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());


Observable.just(null)
            /* Go to known thread to open db transaction */
            .observeOn(mScheduler)
            .doOnNext(o -> myStore.startTransaction())
            /* Do some treatments that change thread */
            .someWebServiceCallWithRetrofit()
            /* Return to known thread to save items in db */
            .observeOn(mScheduler)
            .flatMap(items -> saveItems(items))
            .subscribe();

public Observable<Node> saveItems(List<Item> items) {
    Observable.from(items)
            .doOnNext(item -> myStore.saveItem(item)) //write into the database OK
            .concatMap(tab -> saveSubItems(item));
}

public Observable<Node> saveSubItems(Item item) {
    return Observable.from(item.getSubItems())
            .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}

为什么RxJava突然改变了线程?即使我指定了,我也希望他在我自己的时间表上观察。我在saveSubItem之前添加了另一个observeOn,这是一个错误的修复,但这可能不是正确的解决方案。

我知道,当您使用改装调用web服务时,响应会转发到一个新线程(这就是为什么我创建了自己的调度程序以返回到启动sql事务的线程中)。但是,我真的不明白RxJava是如何管理线程的。

非常感谢你的帮助。

共有2个答案

扶文光
2023-03-14

据我所知,doOnNext方法在不同的Thread中调用,而不是它之前的代码,因为它与序列的其余部分异步运行。

示例:您可以执行多个rest调用,将其保存到数据库和doOnNext(…)中 将程序通知视图/演示者/控制器。您可以在保存到数据库之前或/或保存到数据库之后执行此操作。我建议您使用“flatMapping”代码。

因此,saveItems方法如下所示(如果myStore.saveSubItems返回结果):

public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
        .flatMap(subItem -> myStore.saveSubItems(subItem))
}

使用“平面图”保证操作在与前一个序列相同的线程上运行,并且序列继续,然后flaMap函数结束。

廉博赡
2023-03-14

副作用运算符(与平面图一样)在任何线程调用它时同步执行。尝试类似的操作

Observable.just(null)            
          .doOnNext(o -> myStore.startTransaction())
          .subscribeOn(mScheduler)      // Go to known thread to open db transaction 
            /* Do some treatments that change thread */
          .someWebServiceCallWithRetrofit()                      
          .flatMap(items -> saveItems(items))
          .subscribeOn(mScheduler) // Return to known thread to save items in db
          .observeOn(mScheduler) // Irrelevant since we don't observe anything
          .subscribe();
 类似资料:
  • 问题内容: 如何在RxJava2中将一个发生的异常映射到另一个异常?例如: 在这种情况下,我最终收到包含和的内容,但我只想收到。救命! 问题答案: 您可以使用onErrorResumeNext并从中返回Observable.error(): 编辑 该测试对我来说是合格的:

  • 我知道如何变换

  • 如何转换rxJava中可观察对象返回的错误?现在我有了这个方法: 这种行为是,通过或传递的任何异常都会传播到此函数返回值的订阅者。是否有可能将例外映射到其他东西?就像

  • 我有一个从RESTendpoint返回的列表。我需要将该列表分为多个类别(类别是列表中每个条目中的一个项目)。各个类别将写入缓存,以便以后更快地查找。 我不知道我能不能。映射()条目,并提供多个filter()或某种类型的case语句,以将类别条目放入正确的bucket中。 用rxJava实现这样的东西听起来合理吗? 更新:非工作版本 然而,这些按顺序触发,这是我的理解,第二个可观察对象没有发送任

  • 前缀映射,后缀映射和缓存映射 通过 names.NewPrefixMapper(names.SnakeMapper{}, "prefix") 可以创建一个在 SnakeMapper 的基础上在命名中添加统一的前缀,当然也可以把 SnakeMapper{} 换成 SameMapper 或者你自定义的 Mapper。 例如,如果希望所有的表名都在结构体自动命名的基础上加一个前缀而字段名不加前缀,则可以

  • 问题内容: 我需要一个线程安全映射,我有类似这样的内容:(我对Java很陌生) 问题答案: