当前位置: 首页 > 知识库问答 >
问题:

仅在需要时在Retor的Flux中请求下一个

秦浩漫
2023-03-14

我有一个API,它返回一个限制为100个实体的实体列表。如果有更多实体,它将为下一页返回一个标记。

我想创建一个返回所有实体(所有页面)的通量,但仅在需要时(如果要求)。

我编写了以下代码:

class Page {
    String token;
    List<Object> entities;
}

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
    });
}

而且几乎成功了

如果我请求99个元素,则加载第一页,我的流量包含99个元素。

如果我请求150个元素,则加载第一页和第二页,并且我的流量包含150个元素。

但是,如果我请求100个元素,则会加载第一页和第二页(我的流量包含100个元素)。我这里的问题是,第二个页面是在我没有请求第101个元素的地方加载的。

当前行为:

subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100

应为:在最后一个请求(1)之后加载第2页

这几乎像是在某个地方有预回迁,但我看不到在哪里。有什么想法吗?

共有1个答案

牛兴安
2023-03-14

好的,我找到了。没有按发言进行预回迁。事实上,这是通量。延迟,即在订阅时而不是在请求时加载下一页。

要解决这一问题,一个快速(且肮脏)的测试是:

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux
                .fromIterable(page.entities)
                .concatWith(
                        // Flux.defer(() -> load(page.token, fct))
                        Flux.create(s -> {
                            DelegateSubscriber[] ref = new DelegateSubscriber[1];

                            s.onRequest(l -> {
                                if (ref[0] == null) {
                                    ref[0] = new DelegateSubscriber(s);
                                    load(page.token, fct).subscribe(ref[0]);
                                }
                                ref[0].request(l);
                            });
                        }));
    });
}

static class DelegateSubscriber extends BaseSubscriber<Object> {

    FluxSink<Object> delegate;

    public DelegateSubscriber(final FluxSink<Object> delegate) {
        this.delegate = delegate;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // nothing
    }

    @Override
    protected void hookOnNext(Object value) {
        delegate.next(value);
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        delegate.error(throwable);
    }
}
 类似资料:
  • 我是Pact.io的新手,正在尝试在我们的平台上设置合同测试。应用程序是这样设置的,每个客户帐户都有自己的数据库模式,直接绑定到一个URL子域。当发出API请求时,除了授权标头之外,还必须提供该URL子域。我可以创建一个静态令牌来与使用者测试一起传递,但是当Pact发送请求时,它不知道要使用哪个帐户。我看不出有什么方法可以通过一个URL子域作为消费者测试的一部分,并且不确定如何强制它使用提供者端的

  • 问题内容: 我有一个链接,它将通过ajax加载一些内容。 我的问题是,我不想删除“加载评论”文本,我只是想不允许此类中的更多点击。 jQuery的 我只想第一次看到OK。如何删除该课程? 这不起作用… http://jsfiddle.net/qsn1tuk1/ 问题答案: 使用jQuery的功能 http://www.w3schools.com/jquery/event_one.asp one()

  • 我正在开发一个被其他项目使用的库。该库通过JDBC提供数据库访问,我想在同一个库中也添加对R2DBC的支持。使用项目应该能够根据配置属性在JDBC和R2DBC之间切换。 我面临的问题是(2.5.4)提供的R2DBC自动配置覆盖了JDBC配置,并且使用的项目只能使用R2DBC。 此外,在构建项目时,有些任务,如留档或代码生成、测试等,取决于正在加载的Spring上下文,但不需要数据库访问。这些任务失

  • 我正在使用Reformation和OkHttpClient在Android上构建RESTAPI。 不久前,我注意到api提出的第一个请求总是比其他所有请求需要更长的时间来处理...一开始我不在乎,因为这是一个可以接受的时间。 但是请求时间突然跳到了60秒 所有这些时间都浪费在客户端,因为监视服务器我发现处理时间不到1秒。。。 我想知道我做了什么改变会产生如此大的影响,然后我意识到我改变了OkHtt

  • 问题内容: 我将Require.js与Angular.js结合使用。 一些控制器需要巨大的外部依赖关系,而其他控制器则不需要,例如,需要Angular UI Codemirror 。这至少是135 kb,至少: 我不想每次我的页面加载时都只包含指令和Codemirror lib只是为了使Angular开心。 这就是为什么我现在仅在遇到路线时才加载控制器,就像这里所做的那样。 但是,当我需要类似的东

  • 我有一个pip的包文件(dependencies.conf),其中包括我的应用程序需要的一系列包: 在构建过程中,我使用以下方式下载所有软件包: 然后在部署过程中,我想安装这些文件,只有当安装的版本不同于我需要的,并且顺序正确(依赖关系) 我目前正在使用以下工具: 但是这是错误的,因为它没有验证版本(如果需要,我是为了降级包),并且它没有处理正确的依赖顺序。 有没有简单的方法可以做到这一点?(我基