当前位置: 首页 > 知识库问答 >
问题:

限制正在运行的promise的并发性

羊煜
2023-03-14

我正在寻找一个promise函数包装,它可以在给定的promise运行时限制/节流,以便在给定的时间只运行该promise的一组数量。

在以下情况下,delayPromise不应同时运行,它们应按照先到先得的顺序一次运行一个。

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

关于如何设置这样的队列有什么想法吗?

我有一个来自奇妙的Benjamin Grunbaum的“去盎司”功能。我需要修改它,以限制基于自身执行的promise,而不是延迟。

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}

共有3个答案

蒋鹏鹍
2023-03-14

我也有同样的问题。我写了一个库来实现它。代码在这里。我创建了一个队列来保存所有的promise。当您将一些promise推入队列时,队列顶端的前几个promise将被弹出并运行。一旦完成了一个promise,队列中的下一个promise也将弹出并运行。一次又一次,直到队列没有任务。您可以查看代码了解详细信息。希望这个图书馆能帮助你。

井通
2023-03-14

使用限制promise模块:

https://www.npmjs.com/package/throttled-promise

var ThrottledPromise = require('throttled-promise'),
    promises = [
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... })
    ];

// Run promises, but only 2 parallel
ThrottledPromise.all(promises, 2)
.then( ... )
.catch( ... );
丌官翰采
2023-03-14

我不认为有任何库可以做到这一点,但实际上自己实现起来非常简单:

function queue(fn) { // limitConcurrency(fn, 1)
    var q = Promise.resolve();
    return function(x) {
        var p = q.then(function() {
            return fn(x);
        });
        q = p.reflect();
        return p;
    };
}

对于多个并发请求,它变得有点棘手,但也可以这样做。

function limitConcurrency(fn, n) {
    if (n == 1) return queue(fn); // optimisation
    var q = null;
    var active = [];
    function next(x) {
        return function() {
            var p = fn(x)
            active.push(p.reflect().then(function() {
                active.splice(active.indexOf(p), 1);
            })
            return [Promise.race(active), p];
        }
    }
    function fst(t) {
        return t[0];
    }
    function snd(t) {
        return t[1];
    }
    return function(x) {
        var put = next(x)
        if (active.length < n) {
            var r = put()
            q = fst(t);
            return snd(t);
        } else {
            var r = q.then(put);
            q = r.then(fst);
            return r.then(snd)
        }
    };
}

顺便说一句,你可能想看看演员模型和CSP。他们可以简化处理这些事情,也有一些JS库。

实例

import Promise from 'bluebird'

function sequential(fn) {
  var q = Promise.resolve();
  return (...args) => {
    const p = q.then(() => fn(...args))
    q = p.reflect()
    return p
  }
}

async function _delayPromise (seconds, str) {
  console.log(`${str} started`)
  await Promise.delay(seconds)
  console.log(`${str} ended`)
  return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(200, "a:b")
  await delayPromise(300, "a:c")
}

async function b() {
  await delayPromise(400, "b:a")
  await delayPromise(500, "b:b")
  await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done
 类似资料:
  • 可能有助于为这个问题提供一点背景上下文:我正在构建一个角服务,它有助于将多部分表单数据(mp4视频)块上传到云中的存储服务。 我试图限制同时发生的未解决promise(chunk data请求)的数量。我使用的是侦听正在解析的所有区块上载promise,然后在发生这种情况时返回异步调用(以完成文件)。我认为我的算法遇到了竞争条件,因为在为具有大量块的文件安排所有作业之前被调用,但在较小的文件中成功

  • 有没有办法限制节点js中一次执行的并发Qpromise的数量? 我正在构建一个web scraper,它必须请求并解析3000多个页面,并且在没有限制的情况下,我提出的一些请求没有及时响应,因此连接停止,所需的响应(html代码)变得不可用。 为了解决这个问题,我发现限制请求的数量可以解决我的问题。 我曾尝试过以下方法,但均无效: Qpromise中的并发限制-节点 我需要请求一个URL数组,一次

  • Promise的限制 本节中我们将要讨论的许多细节已经在这一章中被提及了,但我们将明确地复习这些限制。 顺序的错误处理 我们在本章前面的部分详细讲解了Promise风格的错误处理。Promise的设计方式——特别是他们如何链接——所产生的限制,创建了一个非常容易掉进去的陷阱,Promise链中的错误会被意外地无声地忽略掉。 但关于Promise的错误还有一些其他事情要考虑。因为Promise链只不

  • 是否有一种方法/pattern来实现? promise-数组包含构造和返回promise的函数 应在所有解析后解析 只有promise应并行运行 第n+1个promise应在n个完成后立即开始。以便始终有解析器并行运行。

  • 有没有使用Qpromise库限制promise并发的方法? 这个问题有点与如何限制Qpromise并发有关? 但问题是我正试图这样做: 真正的用例是: 从DB获取帖子 循环DB中的每个帖子,如 对于每个帖子做task1,task2,task3(检索社交计数器,检索评论计数等) 在DB中保存新的文章数据。 但问题是node同时执行所有帖子的所有任务,比如同时向facebook询问500篇帖子的“喜欢

  • 问题内容: 我有要处理的网址列表,但我想一次运行最大数量的goroutine。例如,如果我有30个网址,那么我只希望10个goroutine并行工作。 我对此的尝试如下: 我的理解是,如果创建一个大小为并行的缓冲通道,那么该代码将阻塞,直到我读取结果通道为止,这将取消阻塞我的代码并允许生成另一个goroutine。但是,此代码似乎在处理完所有网址后不会阻塞。有人可以向我解释如何使用通道限制运行的g