当前位置: 首页 > 知识库问答 >
问题:

用RxJava实现Room并进行改造

柴辰阳
2023-03-14

我试图使用RxJava和Rome的改型,在你建议使用组件拱之前(在这个机会是不可能的,项目是在和50%和只需要继续与拱清理)。

所以问题是这个。我有一个返回POJO的web服务。类似于这样:

{
 "success":"true",
 "message":"message",
 "data":{[
   "id":"id",
   "name":"name",
   "lname":"lname",
 ]} 
}
    null
  • 相互作用者
  • 回调

演示文稿

  • 演示者
  • 查看
{
 "success":"true",
 "message":"message",
 "data":{[
   "id":"id",
   "name":"name",
   "address":"address",
   "phone":"phone",
 ]} 
}
@Entity(tableName = "clients")
    public class clients {

    String id;
    String name;
    String address;
    String phone;
    String status;


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}
@Dao
public interface ClientsDao {

     @Insert(onConflict = OnConflictStrategy.REPLACE)
     void saveAll(List<Clients> clients);

     @Query("SELECT * FROM Clients")
     Flowable<List<Clients>> listClients();

}
public class RxHelper {
private static final String TAG = RxHelper.class.getName();

@NonNull
public static <T>Observable<T> getObserbable(@NonNull final Call<T> reponse){

    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(final ObservableEmitter<T> emitter) throws Exception {

            reponse.enqueue(new Callback<T>() {
                @Override
                public void onResponse(Call<T> call, Response<T> response) {

                    if (!emitter.isDisposed()) {
                        emitter.onNext(response.body());
                    }
                }

                @Override
                public void onFailure(Call<T> call, Throwable t) {
                    if (!emitter.isDisposed()) {
                        emitter.onError(t);
                    }
                }
            });

        }
    });

}
}
public Observable<ResponseClients> getApiClients(){
        String token = preferences.getValue(SDConstants.token);
        return RxHelper.getObserbable(apiNetwork.getClients(token));
}
@Override
public Observable<ResponseClients> listClients() {
    return factory.listClients();
}

共有1个答案

松景铄
2023-03-14

我不使用room,但熟悉rxjava,您可以这样设计存储库

您的房间interfac

@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

当使用时:
可能当数据库中没有用户并且查询没有返回行时,可能会完成。

public Single<User> getUserById(String userId){
 return  db.getUserById(userId)
              /// if there is no user in the database get data from api
             .onErrorResumeNext(api.getUserById(userId)
              .subscribeOn(Schedulers.io())
              //check your request
              .filter(statusPojo::getStatus)
               // save data to room
              .switchMap(data -> {
              //sava data to db
              return Observable.just(data)
              })
           );

}

更多细节:您可以向存储库注入Api和DB

reactive db的update_Answer如果您想要获得UI上的最新更新,只需执行以下操作:

您的房间界面:

@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);
   @Override
public Flowable<User> getUser(int id) {
    getUserFromNet(id);
         //first emit cache data in db and after request complete   emit last update from net 
        return db.getUserById(id);

 }


 private Flowable<User> getUserFromNet(int id){
      api.getUserById(userId)
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.io())
          //check your request
          .filter(statusPojo::getStatus)
           // save data to room
          .subscribe(new DisposableObserver<User>() {
                @Override
                public void onNext(User user) {
                     // save data to room
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e(e);
                }

                @Override
                public void onComplete() {


                }
            });
}
private Flowable<List<User>>  getUser(int id){
       return db.getUserById(id).
         /// if there is no user in the database get data from 
           .flatMp(userList-> 
           if(userList.size==0)
          api.getUserById(userId)
          .subscribeOn(Schedulers.io())
          //check your request
          .filter(statusPojo::getStatus)
           // save data to room
          .subscribe(new DisposableObserver<User>() {
                @Override
                public void onNext(User user) {
                     // save data to room
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e(e);
                }

                @Override
                public void onComplete() {


                }
            });
                return Flowable.just(data)
                );
}
@Dao
abstract class UserDao   {

@Query("SELECT * FROM users ")
abstract fun findAll(): DataSource.Factory<Int, User>
}

存储库:

private fun getFromDB(pageSize:Int): Flowable<PagedList<User>> {
    return RxPagedListBuilder(userDao.findAll(), pageSize)
        .buildFlowable(BackpressureStrategy.LATEST)
}


private fun getApi(page: Int,pageSize: Int): Disposable {
    return api.getUserList("token", page = page,perPage = pageSize)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe { t1: List<User>?, t2: Throwable? ->
            t1?.let {
                if (it.isNotEmpty())
                    userDao.insert(it)
            }
        }
}

override fun  findAll(page: Int ,pageSize:Int ): 
Flowable<PagedList<User>> {
    return getFromDB(pageSize).doOnSubscribe { getApi(page,pageSize) }
}
 类似资料:
  • 首先,如果我对Rxjava的理解有错,请告诉我。 既然我们已经使用API查询使用referfit过滤了数据,那么对已经获得的数据进行操作有什么需要 为什么在RXJava中使用客户接口对象?因此,我们将其用于观察者或onNext()的位置。 Rxjava是否也有助于检索目的,或者它只是对已经获得的数据进行操作?

  • 如何使用RxJava和Kotlin中的改型创建api调用的泛型类? 我的JAVA实现是:: 首先添加Gradle依赖项:(更新到最新版本(如果可用)) //用于改装 实现“com.squareup.reverfit2:reverfit:2.3.0” 实现“com.squareup.reverfit2:converter-gson:2.3.0” //对于拦截器实现“com.squareup.okht

  • 在网络请求中使用和有什么好处。我见过许多使用的示例,但我想明白为什么。 示例情形: 为每个工作单元创建一个新线程。将使用线程池 但这种争论对应用程序有什么影响呢?还有哪些方面?

  • 试图学习房间和RXJAVA。我已经理解了大约80%的内容,但我仍然无法弄清楚其余的内容。 这是我在插入数据时得到的错误。 Java语言lang.NullPointerException:尝试调用接口方法“void com”。实例学习室。实体道。在空对象引用上插入(com.example.learnroom.Entitys)“” 如果我不运行try catch,我会得到以下错误,这似乎是相关的。 J

  • 我已经用RxJava成功地完成了一个小型Java程序。代码为: 使用此代码,一切正常。现在我正在尝试将此代码传递给Android: 在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。 我在使用map(I)的线路上遇到错误- 内置。gradle(用于应用程序)我正在使用: 我如何解决这个问题?

  • 我在我的项目中使用RxJava3和Room,但我得到了以下错误 错误:不确定如何将游标转换为此方法的返回类型(io.reactivex.rxjava3.core.Flowable 下面是我得到错误的DAO接口方法 我想可能是因为我在成绩档案中使用了以下依赖项: 我试图找到RxJava 3的上述依赖项,但我找不到它。 我想知道如何将RxJava 3与Room一起使用,或者我应该在我的项目中使用RxJ