当前位置: 首页 > 知识库问答 >
问题:

萃取器在反应器堵塞中的作用(从性能上看)

池砚文
2023-03-14

我注意到不建议使用这样的提取器mono.toFuture()flux.collectlist(),因为它们会阻塞流。

我不太确定“封锁”是以哪种方式进行的。就像下面的代码一样,我知道flux.collectlist()将等待所有项的完成,它是像某个线程一样等待,还是只是最后一个线程在最后完成.collectlist()的事情?

有人认为mono.toFuture()也会阻塞,它是否会立即返回一个“future”(当onnext()oncomplete()发生时,该future将可用),或者直到onnext()oncomplete()发生时才返回?

        var m = Flux.range(0, 100)
                .parallel()
                .runOn(Schedulers.boundedElastic())
                .map(i -> Mono.fromFuture(
                        Mono.just(i).map(n -> {
                            try {
                                var s = (long) (Math.random() * 100);
                                Thread.sleep(s);
                                System.out.println(Thread.currentThread() + "after " + s + "ms awaking: " + n);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return n;
                        }).toFuture())
                )
                .doOnNext(o -> System.out.println(Thread.currentThread() + "before sequential"))
                .sequential();

        var mm = Flux.merge(m)
                .doOnNext(o -> System.out.println(Thread.currentThread() + "before collecting"))
                .collectList()
                .doOnNext(o -> System.out.println(Thread.currentThread() + "before map"))
                .map(list -> list.stream().map(i -> i).collect(Collectors.toList()))
                .publishOn(Schedulers.single())
                .toFuture();

共有1个答案

南宫建白
2023-03-14

你的假设不太正确。

mono.toFuture()根本不是阻塞--它只是返回一个CompleteableFuture,您可以阻塞它(如果您调用它的get()方法)或者异步执行它(如果您使用它的任何异步方法,如thenApply()thenCompose()等),您可以跳出Reactor上下文,从而丢失反压力之类的东西,但不必立即阻塞。

您可能会想到reactor的(非常)旧版本,我认为这里有一个toFuture()变体返回Future,而不是CompleteableFuture-虽然这也没有阻塞,但它将您置于一个必须阻塞的上下文中,因为Future没有异步组件。因此,虽然方法调用本身没有阻塞,但这是您唯一的选择。

与普遍的看法相反,flux.collectlist()也不阻塞--它专门返回一个单字 ,这是一个将发出单个元素的非阻塞发布服务器,该元素是该通量中所有内容的列表。当然,您可以在此发布服务器上调用block(),该操作将是阻塞--但是单独调用collectlist()并不比任何其他运算符更阻塞。

话虽如此,它肯定会引起问题。由于它所做的工作的性质(将流量中的所有元素收集到内存中的单个列表中),它可能并不理想:

  • 您可能需要等待很长时间才能发出列表,而没有关于它包含多少元素的反馈,也没有关于它是否被填充的反馈;
  • 如果流量中的元素数或元素大小特别大,则可能会耗尽内存
  • 当添加元素时,您不能输出任何中间状态,因此您将失去流式JSON支持等功能。

然而,这并不使它阻塞,这只是意味着在决定是否值得在PariuClar场景中使用操作符之前,您需要权衡一组不同的潜在问题。

 类似资料:
  • 我有一个小小的React应用程序项目,我已经在Github中部署了它。它的工作原理,甚至我使用导入{浏览器路由器,链接,交换机,路由}从"react-router-dom";路由和作品...这是我的代码:类应用程序扩展反应。组件{ render(){ } }导出默认应用程序;' 我在本地机器上使用过这个,没有“basename”,并且工作过。现在,在github服务器中,我的问题是,当您访问我的应

  • 我的导航不工作,因为某些原因,我有另一个应用程序运行良好,但这一个不能找到错误,请帮助 反应路由器不工作。反应JS 我需要你的帮助。 使用react-router,我可以使用Link元素创建由react router本地处理的链接。

  • 我正在使用一个基于Symfony的react应用程序,并试图使用react-datepicker模块包含一个datepicker。 我可以创建datepicker对象,但它的样式似乎不正确——当我单击选择日期时,页面顶部只有一个垂直的数字列表(我看到很多人在网上经历过这种情况,通常是因为他们没有导入datepicker css)。 我正在导入react-datepicker.css文件,如果我在浏

  • 我想知道正常的java API调用(我的意思是没有I/O的方法)是否应该被线程化为“迷你阻塞调用”?是否可以像这样实现Reactive Streams(在返回Publisher之前调用方法): 而不是(在流中调用它) 此验证器仅用于示例。这种方法是否有任何缺点,或者这些方法在返回语句之前应该总是包含在流中?

  • 我用的是Spring项目Reactor堆芯3.1.8。释放我正在为我的微服务实现一个日志框架,使其具有JSON审计日志,因此使用上下文来存储某些字段,如userID、collaboration ID、component Name和其他几个在请求生命周期中常见的字段。由于不能在反应式服务中用于存储这些元素,因此我必须使用上下文。但是,要了解上下文显然非常困难。我可以通过函数调用从信号中获取对上下文的

  • 我应该说我对spring-cloud-sleuth和Zipkin的简单实用印象深刻。 然而,我正在研究一个POC,我正在考虑为其开发反应工具包。VertX3是我列表中第一个尝试的项目(使用spring cloud生态系统)。我想知道Sleuth日志跟踪是否可以在反应上下文中工作,因为我猜它依赖于ThreadLocals在上下文中传递?渴望了解Sleuth在反应性环境中的地位。