当前位置: 首页 > 知识库问答 >
问题:

如何使用RxJava迭代列表并对第一项执行初始处理

蔡弘扬
2023-03-14

我是RxJava新手,发现它对于Android应用程序中的网络和数据库处理非常有用。

我有两个无法在RxJava中完全实现的用例

用例1

  1. 清除我的目标数据库表表A
  2. 从表B中获取包含关键字字段的数据库记录列表
  3. 对于从表B检索到的每一行,调用远程API并将所有返回的数据持久化到表a中

我管理过的最接近的代码是

final AtomicInteger id = new AtomicInteger(0);

DatabaseController.deleteAll(TableA_DO.class);

DatabaseController.fetchTable_Bs()
        .subscribeOn(Schedulers.io())
        .toObservable()
        .flatMapIterable(b -> b)
        .flatMap(b_record -> NetworkController.getTable_A_data(b_record.getKey()))
        .flatMap(network -> transformNetwork(id, network, NETWORK_B_MAPPER))
        .doOnNext(DatabaseController::persistRealmObjects)
        .doOnComplete(onComplete)
        .doOnError(onError)
        .doAfterTerminate(doAfterTerminate())
        .doOnSubscribe(compositeDisposable::add)
        .subscribe();

用例2

  1. 清除我的目标数据库表Table X
  2. 清除我的目标数据库表Table Y
  3. 从表Z中获取包含键字段的数据库记录列表
  4. 对于从表B检索到的每一行,调用远程API并将一些返回的数据持久化到表X中,其余数据应持久化到表Y中

我没有设法为用例2创建任何代码。

对于这些用例使用RxJava,我有很多问题。

  1. 是否有可能在RxJava中实现我的两个用例?
  2. 将所有这些步骤组合成一个Rx“流”是“最佳实践”吗

使现代化

我最终得到了这个POC测试代码,它似乎是有效的。。。我不确定这是否是最佳解决方案,但我的API调用返回单个,数据库操作返回Completable,所以我觉得这是我的最佳解决方案

public class UseCaseOneA {

    public static void main(final String[] args) {

        login()
        .andThen(UseCaseOneA.deleteDatabaseTableA())
        .andThen(UseCaseOneA.deleteDatabaseTableB())
        .andThen(manufactureRecords())
            .flatMapIterable(x -> x)
            .flatMapSingle(record -> NetworkController.callApi(record.getPrimaryKey()))
            .flatMapSingle(z -> transform(z))
            .flatMapCompletable(p -> UseCaseOneA.insertDatabaseTableA(p))
            .doOnComplete(() -> System.out.println("ON COMPLETE"))
            .doFinally(() -> System.out.println("ON FINALLY"))
            .subscribe();

    }

    private static Single<List<PayloadDO>> transform(final List<RemotePayload> payloads) {
        return Single.create(new SingleOnSubscribe<List<PayloadDO>>() {
            @Override
            public void subscribe(final SingleEmitter<List<PayloadDO>> emitter) throws Exception {
                System.out.println("transform - " + payloads.size());

                final List<PayloadDO> payloadDOs = new ArrayList<>();

                for (final RemotePayload remotePayload : payloads) {
                    payloadDOs.add(new PayloadDO(remotePayload.getPayload()));
                }

                emitter.onSuccess(payloadDOs);
            }
        });
    }

    private static Observable<List<Record>> manufactureRecords() {
        final List<Record> records = new ArrayList<>();
        records.add(new Record("111-111-111"));
        records.add(new Record("222-222-222"));
        records.add(new Record("3333-3333-3333"));
        records.add(new Record("44-444-44444-44-4"));
        records.add(new Record("5555-55-55-5-55-5555-5555"));

        return Observable.just(records);
    }

    private static Completable deleteDatabaseTableA() {

        return Completable.create(new CompletableOnSubscribe() {

            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("deleteDatabaseTableA");

                emitter.onComplete();
            }
        });
    }

    private static Completable deleteDatabaseTableB() {

        return Completable.create(new CompletableOnSubscribe() {

            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("deleteDatabaseTableB");

                emitter.onComplete();
            }
        });
    }

    private static Completable insertDatabaseTableA(final List<PayloadDO> payloadDOs) {
        return Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("insertDatabaseTableA - " + payloadDOs);

                emitter.onComplete();
            }
        });
    }

    private static Completable login() {
        return Completable.complete();
    }
}

这段代码并没有满足我所有的用例需求。即能够将远程有效负载记录转换为多种数据库记录类型,并将每种类型插入到其自己的特定目标数据库表中。

我可以调用Remote API两次以获取相同的远程数据项,然后首先转换为一种数据库类型,然后再转换为第二种数据库类型,但是这似乎很浪费。

RxJava中是否有一个操作数可以重用API调用的输出并将其转换为另一种数据库类型?

共有1个答案

牛枫
2023-03-14

您必须以某种方式自己编制项目索引,例如,通过外部计数:

Observable.defer(() -> {
    AtomicInteger counter = new AtomicInteger();

    return DatabaseController.fetchTable_Bs()
        .subscribeOn(Schedulers.io())
        .toObservable()
        .flatMapIterable(b -> b)
        .doOnNext(item -> {
            if (counter.getAndIncrement() == 0) {
                // this is the very first item
            } else {
                // these are the subsequent items
            }
        });
});

延迟是将计数器与内部序列隔离所必需的,以便在必要时仍能重复。

 类似资料:
  • 我对RxJava/RxAndroid还不熟悉,但我一直坚持使用我的用例。 我尝试迭代一个

  • 使用JSF 2.0,我需要显示一个表,其中每一行都包含一个打开弹出窗口的链接。我有两种型号:

  • 我试图迭代一个ArrayList并使用规则中的jboss drools添加到另一个ArrayList。 我的规则如下。 用口水怎么做?

  • 如何拆分列表并并行执行 我的场景-- 我从webservices获得了1000辆使用下面列表的车辆。 我想在每个列表中分割100辆车并并行执行所有列表。 谢谢

  • 简而言之:在我的JSP中,我需要使用C:Foreach jstl迭代一个自定义bean类列表,比如list,MyClass有键和值变量,以及这些变量的getter和setter 细节:类似这样的内容: 在Java代码中,我有: List pairlist=new ArrayList();//MyClass是一个简单的bean类,具有变量“key”和“value”,以及相同的getter和sette

  • 假设数据采集对象如下所示 我需要尝试使用java lambda函数在引擎匹配时获取匹配的品牌对象。 现在,如果发动机类型匹配,我需要获得品牌名称。例如,如果搜索引擎1,则需要返回Honda对象。 下面是代码示例:类Vehicle: 类别品牌: 类车: 类引擎: 班级主修: 以上是我尝试的代码。之前,我的目标是使用授权子对象列表中的关键字进行过滤。 现在,我使用它作为一个键,以父对象或品牌对象作为值