限制正在运行的承诺的并发性

Tho*_*ggi 6 javascript node.js promise bluebird

我正在寻找一个promise函数包装器,它可以在给定的promise运行时限制/限制,以便在给定的时间只运行一定数量的promise.

在下面的情况下delayPromise,永远不应该同时运行,它们应该以先来先服务的顺序一次运行一个.

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))
Run Code Online (Sandbox Code Playgroud)

关于如何获得像这样的队列设置的任何想法?

我有精彩的"去抖"功能Benjamin Gruenbaum.我需要修改它来根据它自己的执行而不是延迟来限制一个承诺.

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}
Run Code Online (Sandbox Code Playgroud)

Ber*_*rgi 6

我认为没有任何库可以做到这一点,但实际上实现起来非常简单:

function queue(fn) { // limitConcurrency(fn, 1)
    var q = Promise.resolve();
    return function(x) {
        var p = q.then(function() {
            return fn(x);
        });
        q = p.reflect();
        return p;
    };
}
Run Code Online (Sandbox Code Playgroud)

对于多个并发请求,它会变得有点棘手,但也可以完成.

function limitConcurrency(fn, n) {
    if (n == 1) return queue(fn); // optimisation
    var q = null;
    var active = [];
    function next(x) {
        return function() {
            var p = fn(x)
            active.push(p.reflect().then(function() {
                active.splice(active.indexOf(p), 1);
            })
            return [Promise.race(active), p];
        }
    }
    function fst(t) {
        return t[0];
    }
    function snd(t) {
        return t[1];
    }
    return function(x) {
        var put = next(x)
        if (active.length < n) {
            var r = put()
            q = fst(t);
            return snd(t);
        } else {
            var r = q.then(put);
            q = r.then(fst);
            return r.then(snd)
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

顺便说一下,你可能想看一下actor模型CSP.他们可以简化处理这些事情,也有一些JS库可供他们使用.

import Promise from 'bluebird'

function sequential(fn) {
  var q = Promise.resolve();
  return (...args) => {
    const p = q.then(() => fn(...args))
    q = p.reflect()
    return p
  }
}

async function _delayPromise (seconds, str) {
  console.log(`${str} started`)
  await Promise.delay(seconds)
  console.log(`${str} ended`)
  return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(200, "a:b")
  await delayPromise(300, "a:c")
}

async function b() {
  await delayPromise(400, "b:a")
  await delayPromise(500, "b:b")
  await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done
Run Code Online (Sandbox Code Playgroud)