当前位置: 首页 > 知识库问答 >
问题:

如何限制活动Spring WebClient调用的数量

裴俊迈
2023-03-14

我有一个要求,即我使用Spring批处理从SQL DB中读取大量行(数千行),并在编写Kafka主题之前调用REST服务来丰富内容。

使用Spring Reactive webClient时,如何限制活动非阻塞服务调用的数量?在我使用Spring Batch读取数据后,我应该在循环中引入通量吗?

(我理解delayElements的用法,它有不同的目的,比如当一个获取服务调用带来大量数据,你希望服务器慢下来——尽管如此,我的用例有点不同,因为我有很多WebClient调用并希望限制调用次数,以避免内存溢出问题,但仍能获得非阻塞调用的优势)。

共有1个答案

端木骞尧
2023-03-14

非常有趣的问题。我仔细考虑了一下,想到了一些如何做到这一点的想法。我将分享我的想法,希望这里有一些想法可以帮助你进行调查。

不幸的是,我不熟悉Spring Batch。然而,这听起来像是利率限制的问题,或者传统的生产者-消费者问题。

所以,我们有一个生产者,产生了如此多的信息,我们的消费者无法跟上,中间的缓冲变得难以忍受。

我看到的问题是,正如您所描述的那样,您的Spring Batch流程不是作为流或管道工作的,而是您的反应性Web客户端。

因此,如果我们能够以流的形式读取数据,那么当记录开始进入管道时,这些记录将被反应性网络客户端处理,并且使用反压力,我们可以从生产者/数据库方控制流的流。

制片方

因此,我要改变的第一件事是如何从数据库中提取记录。我们需要控制一次从数据库中读取多少条记录,或者通过对数据检索进行分页,或者通过控制获取大小,然后通过背压控制有多少条记录通过反应性管道发送到下游。

因此,考虑以下(基本的)数据库数据检索,包装在Flux中。

Flux<String> getData(DataSource ds)  {
    return Flux.create(sink -> {
        try {
            Connection con = ds.getConnection();
            con.setAutoCommit(false);
            PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
            stm.setFetchSize(1000);
            ResultSet rs = stm.executeQuery();

            sink.onRequest(batchSize -> {
                try {
                    for (int i = 0; i < batchSize; i++) {
                        if (!rs.next()) {
                            //no more data, close resources!
                            rs.close();
                            stm.close();
                            con.close();
                            sink.complete();
                            break;
                        }
                        sink.next(rs.getString(1));
                    }
                } catch (SQLException e) {
                    //TODO: close resources here
                    sink.error(e);
                }
            });
        }
        catch (SQLException e) {
            //TODO: close resources here
            sink.error(e);
        }
    });
}

在上面的例子中:

  • 通过设置fetch size,我将每批读取的记录数量控制为1000
  • 接收器将发送订户请求的记录量(即batchSize),然后等待订户使用背压请求更多记录
  • 当结果集中没有更多记录时,我们完成接收并关闭资源
  • 如果在任何时候发生错误,我们都会发回错误并关闭资源
  • 或者,我可以使用分页来读取数据,通过在每个请求周期重新发出查询,可能简化了资源的处理 <> LI>如果订阅被取消或处理,您也可以考虑做一些事情(<代码>下沉。取消/<代码>,<代码>下沉。OnDebug >,因为关闭连接和其他资源是基本的。<李>

消费者方面

用户端,您注册了一个订阅者,该订阅者当时仅以1000的速度请求消息,并且在处理该批消息后,它只会再次请求消息。

getData(source).subscribe(new BaseSubscriber<String>() {

    private int messages = 0;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        subscription.request(1000);
    }

    @Override
    protected void hookOnNext(String value) {
        //make http request
        System.out.println(value);
        messages++;
        if(messages % 1000 == 0) {
            //when we're done with a batch
            //then we're ready to request for more
            upstream().request(1000);
        }
    }
});

在上面的示例中,当订阅启动时,它请求第一批1000条消息。在onNext中,我们处理第一批,使用Web客户端生成超文本传输协议请求。

一旦批次完成,然后我们向出版商请求另一批1000,依此类推。

这就是你的梦想!使用背压,您可以控制此时有多少打开的HTTP请求。

我的示例非常初级,需要做一些额外的工作来准备生产,但我相信这有希望提供一些可以适应Spring批处理场景的想法。

 类似资料:
  • 问题内容: Iam使用Sherlock片段制作动作栏。而且我在操作栏中有ListView,但是我将Listview称为活动..请帮助我,谢谢.. :) 在这段代码中,有一个使用数组list的listview。而且我想mylist可以调用相同的活动。 因此可以在操作栏中点赞菜单。当我们点击listview ..他可以打电话给活动谢谢..请帮助我! 问题答案: 您需要在列表视图上使用setOnItem

  • 是否可以限制siteminder SP上受保护资源上的活动用户数?我想确保受保护的资源被有限数量的活动用户使用,比如说n。 谢谢Andrea

  • 在我的程序中,我有一个当应用程序打开时启动的活动。如果我再打开几个活动,我怎么能回到主活动?在意图过滤器中,活动的名称是“android.intent.action.MAIN”,它不允许我在上面调用start Active()。我该怎么办?

  • 问题内容: 我想从其他活动中调用主要活动中的公共方法。我怎样才能做到这一点? 问题答案: 这取决于。 在这种情况下,如果您只想使用某些共享功能(例如执行一些计算的代码)。 我建议将此共享功能移至某些独立的类,然后从那里进行调用。 在这种情况下,如果您想调用MainActivity,则MainActivity使用MainActivity UI进行了某些操作,则您必须使用Intent(http://d

  • 我刚接触Android,有一个带有片段的选项卡布局,我有一个编辑文本字段,它是在我的父活动中声明的,我想检查编辑文本字段是否为空,是否来自片段。我该怎么做呢?这是我做的,但它显示了错误。这是我的主要活动编辑文本字段: 这是我的片段活动: 这是我的日志:

  • 编辑:这是我的舱单