当前位置: 首页 > 知识库问答 >
问题:

为什么一些Reactor操作员要求的元素远远超过他们的兴趣?

齐昊
2023-03-14

我有以下代码:

Flux<String> flux = Flux.<String>never()
        .doOnRequest(n -> System.out.println("Requested " + n));

这是一种从不发出任何信号,但向控制台报告需求的流量。

以下三行中的每一行

flux.take(3).next().block();
flux.next().block();
flux.blockFirst();

产生此输出:

Requested 9223372036854775807

查看代码,我看到以下内容。

BlockingSingleSubscriber(在Flux#block First()Mono#block()的情况下都有效:

public final void onSubscribe(Subscription s) {
    this.s = s;
    if (!cancelled) {
        s.request(Long.MAX_VALUE);
    }
}

<代码>MonoNext。下一个订阅者:

public void request(long n) {
    if (WIP.compareAndSet(this, 0, 1)) {
        s.request(Long.MAX_VALUE);
    }
}

<代码>FluxTake。TakeSubscriber:

public void request(long n) {
    if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
        if (n >= this.n) {
            s.request(Long.MAX_VALUE);
        } else {
            s.request(n);
        }
        return;
    }

    s.request(n);
}

因此,Flux#blockFirst()、Flux#next()Mono#block()总是向其上游发出无限需求的信号,在某些情况下,Flux#take()也可以这样做。

但是,Flux#blockFirst()、Flux#next()Mono#block()都需要最多一个来自其上游的元素,而Flux#take()则需要最多的元素。n

此外,Flux#走()javadoc说:

请注意,此操作员不会操纵背压请求量。相反,它只允许来自下游的请求按原样传播,并在发出N个元素后取消。因此,源可以同时产生大量外来元素。如果这种行为不受欢迎,并且您不拥有来自下游的请求(例如预取操作符),请考虑改用{@link#limitRequest(long)}。

问题是:当他们预先知道限额时,为什么他们会发出无限需求的信号?我的印象是,反应性背压只是问你准备消费什么。但实际上,它通常是这样工作的:向上游喊出“尽你所能”,然后在满足后取消订阅。在上游生成数百万条记录的成本很高的情况下,这似乎只是浪费。

共有1个答案

巫马松
2023-03-14

tl;灾难恢复-在基于拉的系统中,仅请求您需要的通常是理想的,但在基于推的系统中很少是理想的。

我的印象是,反应性背压只是问你准备消费什么。

不完全是,这是你能消费的东西。这种差别很细微,但很重要。

在基于拉动的系统中,您是完全正确的-请求比您知道的更多的值几乎永远都不是一件好事,因为您请求的值越多,就需要进行更多的工作来产生这些值。

但是请注意,反应式流本质上是基于推送的,而不是基于拉动的。大多数反应式框架,包括反应器,都是在考虑到这一点的情况下构建的——尽管混合或基于拉动的语义学是可能的(例如,使用Flux.generate()一次按需生成一个元素),但这在很大程度上是一个次要用例。通常情况下,有一个发布者需要卸载大量数据,并且它“希望”尽快将这些数据推送给你以摆脱它。

这一点很重要,因为它从请求的角度颠覆了理想的观点。这不再是一个“我最需要什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。

例如,假设我有一个数据库查询,返回2000条连接到flux的记录,但我只需要1条。如果我有一个发布者正在推送这2000条记录,并且我调用了request(1),那么我根本没有“帮助”事情-我没有减少数据库方面的处理,这些记录已经在那里等待了。然而,由于我只请求了1,所以发布者必须决定是否可以缓冲剩余的记录,或者最好跳过部分或全部记录,或者如果无法保持,则应该抛出异常,或者完全执行其他操作。不管它做什么,我请求的记录更少,实际上导致了更多的工作,在某些情况下甚至可能是一个例外。

诚然,这并不总是可取的——也许Flux中的那些额外元素确实会导致浪费的额外处理,也许网络带宽是一个主要问题,等等。在这种情况下,您会希望显式调用Limited Request()。不过,在大多数情况下,这可能不是您想要的行为。

(为了完整性起见,最好的方案当然是限制源数据——例如,如果您只想要一个值,请在您的数据库查询中放置LIMIT 1。然后您不必担心这些东西。但是,当然,在现实世界的使用中,这并不总是可能的。)

 类似资料:
  • C++20概念的一个特点是,在某些情况下,您必须编写。例如,[expr.prim.req]/3中的这个示例:

  • $ git clone <url> 复制clone命令就会自动设定为追踪远程数据库 。这样,在执行push或fetch/pull命令时,即使省略repository,也可以正确地显示/读取修改内容。 入门篇 【共享数据库】 克隆远程数据库 入门篇 【教程2 共享数据库】 克隆远程数据库 $ git remote add <name> <url> 显示远程数据库列表 $ git remote 添加-

  • 问题内容: 为什么我们永远不能将元素作为元素的子元素? 我用以下代码制作了一个网页 在这里,元素是p元素的子元素。但是,在所有主流浏览器(Chrome,Firefox和Internet Explorer-所有最新版本)中,其解释如下 我通过右键单击元素(在Chrome中)并选择检查元素选项来对其进行检查。我在Chrome中看到了它,但是其他两个浏览器的行为也相同(CSS选择器“ p ul”无法正常

  • 我正在编写一个函数,该函数应该删除链表的最后一个元素 这是我对节点的定义 我有一个节点列表1,它为值1、2、3运行了3次insert函数 insert函数如下所示 现在我的delete_last_element函数如下所示 基本上,我的想法是,我会先看看第一个元素是否为空,如果是,我什么也不做,因为没有最后一个元素。然后我将curr设置为head->next以查看列表中的下一个元素,如果它为nul

  • 问题内容: ImmutableSet实现Set接口。对于而言,没有意义的功能现在称为的“可选操作” 。我认为这种情况。因此,现在引发了许多可选操作。 这对我来说似乎是倒退。我被告知接口是一种契约,因此您可以在不同的实现中使用强制功能。可选操作的方法似乎从根本上改变(矛盾?)接口的作用。今天实现这一点,我会将接口分为两个接口:一个接口用于不可变的操作,第二个接口用于扩展器的那些操作。(很快,袖带解决

  • 本文向大家介绍块的主要元素是什么?相关面试题,主要包含被问及块的主要元素是什么?时的应答技巧和注意事项,需要的朋友参考一下 回答:以下是块的主要元素: 指向上一个块的哈希指针 时间戳记 交易清单