当前位置: 首页 > 知识库问答 >
问题:

如何在Flux上同时调用订阅和阻止?

狄高畅
2023-03-14

我一直在试验项目反应器和反应流。我在使用订阅On使流在不同的线程上运行时遇到了一个问题。将我的代码放在主线程中,我需要主线程块,直到流完成,所以我做了这样的事情:

        Flux.just(1, 2, 3, 4)
                .log()
                .subscribeOn(Schedulers.parallel())
                .subscribe((i) -> {
                   // some operation 
                });

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Finished");

然后我注意到有一个块最后()方法执行阻塞。但是我不能同时使用订阅和块最后,因为它们不返回Flux

有什么优雅的方法可以做到这一点吗?

共有1个答案

华凌
2023-03-14

所有块方法都代表您进行订阅。您可以将原本放在订阅lambda中的代码移动到等效的doOn*方法中

 类似资料:
  • 我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。 在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是

  • 我写了一个逻辑,使用Spring反应器库来获取所有运算符,然后在异步模式下为每个运算符(分页)获取所有设备。 创建了一个通量来获取所有运算符,然后订阅它。 现在,对于每个运算符,我正在获取需要多个订阅才能获得设备mono的设备,该设备通过订阅MONO获得所有页面异步。 此代码工作正常。但我的问题是,从内部订阅mono是一个好主意吗?

  • 例: 注意:是非Android 运行环境, 使用的是RxJava2.x

  • 所以我有一个。我想知道每个下游订户调用何时发出,这既是为了测量每个下游订户花费的时间,也是为了反压。 让我半途而废--我可以用自己的包装每个单独的订阅服务器,如下面的示例所示。当所有下游订户完成下一个调用时,它不会通知我,而我自己也不必做一些记账。 提前感谢!

  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 我正在将redis发布/子系统转换为redis streams,这样我就可以为服务器发送的事件添加一些容错功能。 用传统的方式订阅很简单: 这会永久阻塞,并保持连接打开。在这种情况下,发布到redis将添加到您的日志中。 切换到流时,您使用而不是订阅,使用而不是发布,数据会有很大的不同。我正在努力解决的部分是阻塞。 发送消息时,我的“订阅”会接收第一条消息,但不会记录更多消息。