当前位置: 首页 > 知识库问答 >
问题:

RxJava可观察到可完成,如何避免toBlocking()

燕扬
2023-03-14

我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。

我在员工服务中有一个方法,它返回一个可观察的

fun all(): Observable<List<Employee>>

这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历员工并计算每个员工的工资,这也会返回一个可见值,这就是我现在使用Toblock的地方。我目前的做法是:

private fun generatePdf(outputStream: OutputStream): Completable {
    return employeeService.all().map { employees ->
        try {
                addHeaderToPDF()
                for (i in employees) {
                    val calculated = employeeService.calculateWage(i.id).toBlocking().first()
                    // Print calculated to PDF....
                }
                addFooterToPDF()
                return @map Completable.complete()
            }
            catch (e: Exception) {
                return @map Completable.error(e)
            }
        }.first().toCompletable()

有没有什么方法可以使用RxJava让这段代码更干净一点?

提前感谢!

共有2个答案

辛承志
2023-03-14

我想到了这个:

    return employeeService.all().first()
            .doOnSubscribe { addHeaderToPDF() }
            .flatMapIterable { it }
            .flatMap { employeeService.calculateWage(it.id).first() }
            .doOnNext { printEmployeeWage(it) }
            .doOnCompleted { addFooterToPDF }
            .toCompletable()

这是应该这样做吗?:)

班承德
2023-03-14

免责声明:此答案是正在进行的工作。

基本前提:如果流中有阻塞,那么就是做错了。

注意:任何状态都不能离开可观察的lambda。

输入是一个员工流。对于每个员工,你需要获得一份工资。让我们把它变成一条小溪。

/**
 * @param employeesObservable
 * Stream of employees we're interested in.
 * @param wageProvider
 * Transformation function which takes an employee and returns a [Single] of their wage.
 * @return
 * Observable stream spitting individual [Pair]s of employees and their wages.
 */
fun getEmployeesAndWagesObservable(
        employeesObservable: Observable<Employee>,
        wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
    val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

    // Each Employee from the original stream will be converted
    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.
    // Remember, we need a stream and Single is a stream.
    employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
        // We need to get a source of wage value for current employee.
        // That source emits a single Int or errors.
        val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

        // Once the wage from said source is loaded...
        val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
            // ... construct a Pair<Employee, Int>
            employee to wage
        }

        // This code is not executed now. It will be executed for each Employee
        // after the original Observable<Employee> starts spitting out items.
        // After subscribing to the resulting observable.
        return@flatMapSingle employeeAndWageSingle
    }

    return employeesAndWagesObservable
}

订阅时会发生什么:

  1. 从源头带走一名员工。
  2. 获取雇员的工资。
  3. 吐出一对员工和他们的工资。

这一过程会重复,直到员工发出可维护的信号,或某个错误出现故障。

使用的运算符:

  • FLAPMapSing:将实际值转换为具有某些转换值的新单一流。
  • map:将实际值转换为其他实际值(没有嵌套流)。

Hee是如何将其连接到代码的:

fun doStuff() {
    val employeesObservable = employeeService.all()
    val wageProvider = Function<Employee, Single<Int>> { employee ->
        // Don't listen to changes. Take first wage and use that.
        employeeService.calculateWage(employee.id).firstOrError()
    }

    val employeesAndWagesObservable = 
            getEmployeesAndWagesObservable(employeesObservable, wageProvider)

    // Subscribe...
}

使用的运算符:

  • 第一:从observable中获取第一项,并将其转换为单个流
  • 超时:最好是。超时(timeout)如果您通过网络获取工资,则为工资

不要订阅,打电话

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

并以同步方式处理每个项目。坐下来,找出下一步,观看演示文稿,阅读示例

  • <代码>。映射这些对中的每一对

 类似资料:
  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。 需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。 问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成

  • 假设存在包含方法的接口: 实现combinedCall方法的最佳方法是什么: 从makeHttpCall获取数据 使用store InDatabase存储它 返回在store InDatabase完成时完成的完成? 似乎在RxJava 1.0中可以执行Completable.merge(可观察),但合并似乎不再接受可观察。

  • 问题内容: 给定汽车清单(),我可以这样做: 有没有办法我可以从一个到一个序列? 像没有参数的东西 问题答案: 您可以这样映射到: 请注意,flatMapping可能不会保留源可观察的顺序。如果订单对您很重要,请使用。

  • 在android 6.0.1 Samsung s6 Edge+上的测试 当device screen脱机并从debug中拔出时,可观察到的只是停止发射项目。如果设备打开,则开始发射对象。另一个问题是,在停止接收项目之前,我会按照相同项目的顺序随机地得到2/3个重复调用 ____________________________edit_________________________________

  • 我有一个带有http请求的服务,它返回我的标题的可观察到的内容 servise.ts 在我的组件中,我有一个函数从service get Request设置。看起来是这样的: 问题是,有时我接收到带有空标签的标题,不需要显示它们,所以我需要对其进行过滤,并对此标题发送.delete()请求。我尝试了类似的方法(想法是在之前添加,然后在另一个subscribe内部调用。)差不多吧 但不确定这是不是个