当前位置: 首页 > 知识库问答 >
问题:

为最大请求/秒的批次处理作业选择适当的异步方法

冯曾笑
2023-03-14

我需要执行对一些外部API的循环调用,并有一定的延迟,以防止“超出用户速率限制”限制。

谷歌地图地理编码API对req/sec敏感,允许10 req/sec。我应该为数百个联系人进行地理编码,这样的延迟是必需的。所以,我需要有一个10异步地理编码函数,每个函数延迟1秒。所以,我收集数组中的所有触点,然后以异步方式在数组中循环。

一般来说,我需要有N个同时线程,每个线程的末尾延迟为D毫秒。整个循环遍历一系列User实体。每个线程处理单个实体,像往常一样。

我想应该有这样一个代码:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);

在这个伪代码中,batchMethod是我想要的方法

共有1个答案

杨飞
2023-03-14

延迟结果并不是你真正想要的。相反,您希望跟踪您发送的内容,以及何时发送,以便一旦您达到每秒请求数的界限,就可以发送另一个请求。

下面是一个函数的一般概念,它将控制速率限制为每秒固定数量的请求。这将使用promise,并要求您提供一个返回promise的请求函数(如果您现在不使用promise,您只需要将请求函数包装为promise)。

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length >= requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta >= 0) {
                    setTimeout(runMore, ++delta);
                }

            }
        }
        runMore();
    });
}

示例用法:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

Github上有一个更高级的函数版本,名为rateMap()

这段代码背后的基本思想是:

  1. 您将传入一个数组以进行迭代
  2. 它返回一个promise,其解析值是一个结果数组(按顺序)
  3. 您传递的requestsPerSec的最大数量为曾经命中的次数
  4. 您在飞行中同时传递最大数量的请求
  5. 您传递一个函数,该函数将从正在迭代的数组中传递一个元素,并且必须返回一个promise
  6. 它在最后一次发送请求时保留一个时间戳数组
  7. 为了查看是否可以发送另一个请求,它在数组中向后查看,并统计在最后一秒钟发送了多少个请求
  8. 如果该数字低于阈值,则会发送另一个
  9. 如果该数字达到阈值,那么它将计算出您需要等待多长时间才能发送另一个数字,并为该时间量设置计时器
  10. 完成每个请求后,它会检查是否可以发送更多请求
  11. 如果任何请求拒绝其promise,则返回的promise立即拒绝。如果您不希望它在出现第一个错误时停止,那么请修改传入函数,使其不拒绝,而是使用某个值进行解析,以便在以后处理结果时将其标识为失败的请求

这是一个工作模拟:https://jsfiddle.net/jfriend00/3gr0tq7k/

注意:如果你传入的maxInFlight值高于请求PerSec值,那么这个函数基本上只发送请求PerSec请求,然后一秒钟后,发送另一个请求PerSec请求,因为这是保持在<代码>请求PerSec边界。如果maxInFlight值与请求PerSec相同或更低,那么它将发送请求PerSec,然后当每个请求完成时,它将查看是否可以发送另一个请求。

 类似资料:
  • Spring MVC 3.2开始引入了基于Servlet 3的异步请求处理。相比以前,控制器方法已经不一定需要返回一个值,而是可以返回一个java.util.concurrent.Callable的对象,并通过Spring MVC所管理的线程来产生返回值。与此同时,Servlet容器的主线程则可以退出并释放其资源了,同时也允许容器去处理其他的请求。通过一个TaskExecutor,Spring M

  • 我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我遵循了spring批处理文档,无法异步运行我的作业。 因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。 我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。 BatchConfig创建异步JobL

  • Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml

  • 本文向大家介绍springmvc处理异步请求的示例,包括了springmvc处理异步请求的示例的使用技巧和注意事项,需要的朋友参考一下 springmvc 3.2开始就支持servlet3.0的异步请求。平常我们请求一个controller一般都是同步的,如果在代码执行中,遇到耗时的业务操作,那servlet容器线程就会被锁死,当有其他请求进来的时候就会受堵了。 springmvc3.2之后支持异