当前位置: 首页 > 知识库问答 >
问题:

将异步调用转换为RxJava,如何在处理完所有项目后手动完成?

王凌
2023-03-14

我有一个自定义类,用于将单个文件或多个文件上传到Firebase存储:

public class FirebaseUploader {
    private Context appContext;
    private FirebaseStorage storage;

    public FirebaseUploader(Context appContext) {
        this.appContext = appContext;
        this.storage = FirebaseStorage.getInstance();
    }

    // Uploading a single file
    public Observable<String> send(Uri file) {
        return toObservable(Observable.just(file));
    }

    // Uploading multiple files
    public Observable<String> send(List<Uri> files) {
        return toObservable(Observable.fromIterable(files));
    }

    private Observable<String> toObservable(Observable<Uri> observable) {
        return observable.flatMap(this::upload);
    }

    private Observable<String> upload(Uri file) {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                StorageReference branch = storage.getReference().child("images");
                StorageReference leaf = branch.child(generateUniqueId(file));
                UploadTask uploadTask = leaf.putFile(file);

                uploadTask.continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
                    @Override
                    public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
                        if (!task.isSuccessful()) emitter.onError(task.getException());
                        return leaf.getDownloadUrl();
                    }
                }).addOnCompleteListener(new OnCompleteListener<Uri>() {
                    @Override
                    public void onComplete(@NonNull Task<Uri> task) {
                        if (task.isSuccessful()) {
                            Uri downloadUri = task.getResult();
                            String url = downloadUri.toString();
                            emitter.onNext(url);
                            //emitter.onComplete();
                        } else {
                            // Handle failures
                            // ...
                        }
                    }
                });
            }
        });
    }

    private String generateUniqueId(Uri file) {
        String fileType = appContext.getContentResolver().getType(file);
        String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
        String fileName = UUID.randomUUID().toString();
        return fileName + extension;
    }
}

实际上,我花了几天时间才想出这个解决方案,因为我最初不知道如何将Firebase异步调用转换为RxJava。然后我被困了很长一段时间,想知道为什么可观察的没有完成,直到我最终意识到我在调用emitter.onNextemitter.onError,但是没有<--plhd-2/>完成在我的代码中的任何地方!

所以发射器。onComplete位于OnCompleteListener内。但它被注释掉了——这不是该打电话的地方。我一直在寻找一种调用的方法。只有在处理完所有文件后,才能在发射器上执行onComplete()。由于我对RxJava的经验有限,我看到的唯一方法是首先计算要上传的文件数量,然后在达到该数量时手动完成序列。难道没有更好的办法吗?


共有1个答案

唐利
2023-03-14

upload方法返回的可观察对象只负责一件事:上传Uri参数指定的单个文件。该任务在UploadTask完成时完成,因此此时应调用onComplete

flatMap操作员负责将所有这些结果收集到一个可观察的中。上传所有文件后,它将向下游发出onComplete,这正是您想要的。

 类似资料:
  • 问题内容: 如标题所示。我该怎么做呢? 我想在forEach循环遍历每个元素并完成一些异步处理后调用。 有可能使它像这样工作吗?当forEach的第二个参数是一个回调函数,该函数一旦经过所有迭代便会运行? 预期产量: 问题答案: 不能提供这种效果(如果可以的话),但是有几种方法可以实现您想要的效果: 使用一个简单的计数器 (由于@vanuan等),这种方法可确保在调用“完成”回调之前处理所有项目。

  • 有没有一种方法可以让发出数据,并且当它发出数据时(或者当它完成时)以异步方式使用该数据触发? 我的问题与此非常相似,但我试图异步调用Completable。 这是我试图实现的一个微不足道的例子: 我看到订阅映射中的Completable允许我以某种方式完成这项任务,但我不知道如何以正确的方式处理它(因为在活动中不调用此方法)。 非常感谢您的帮助,谢谢!

  • 我的完成处理程序有问题。下面是一个带有完成处理程序的函数,位于一个实用程序文件中: 我在视图控制器中调用它 输出清楚地表明该函数在运行该块之前没有等待完成: 我如何解决这个问题?

  • 问题内容: 我知道callable的调用可能会将异常抛出给调用它的父方法,而runnable则不是这种情况。 我不知道如何,因为它是线程方法,并且是线程堆栈的最底层方法。 问题答案: 的要点是将异常抛出到调用线程,例如,当您获得提交的结果时。

  • 问题内容: 我在NodeJS中有一个forEach循环,遍历一系列键,然后从Redis异步检索其值。循环和检索完成后,我想返回该数据集作为响应。 我目前的问题是因为数据检索是异步的,发送响应时没有填充我的数组。 如何在我的forEach循环中使用promise或回调,以确保响应与数据一起发送? 问题答案: 我在这里使用Bluebird Promise 。请注意,代码的意图非常清晰并且没有嵌套。 首

  • 问题内容: 使用所有可用内存后,Redis将如何处理XADD?是否会从流中删除最旧的项目,并添加新的项目?添加后,旧项目仍会存在于AOF文件中吗?它会抛出错误而不添加新项吗?我应该期待什么? 问题答案: 流是所有其他人一样的数据结构,这样的Redis将尊重并在RAM中的压力的情况下。根据策略,新的写请求将被拒绝,或者现有密钥(是否存在流)将被驱逐。 在https://redis.io/topics