异步生成器在解决时产生承诺结果

Don*_*tch 1 javascript generator fetch promise async-await

假设我想同时获取 10 个 url,并在收到响应时对其进行处理(其顺序可能与它们在原始列表中出现的顺序不同)。忽略拒绝的可能性,一种方法是简单地为每个 Promise 附加一个“then”回调,然后等待它们全部使用完成Promise.all()

const fetch_promises = [
  fetch("https://cors-demo.glitch.me/allow-cors"),
  fetch("/"),
  fetch("."),
  fetch(""),
  fetch("https://enable-cors.org"),
  fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
  fetch("https://api.github.com"),
  fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
  processing_promises.push(fetch_promise.then(response => {
    // Process response.  In this example, that means just
    // print it.
    console.log("got a response: ",response);
  }));
}
await Promise.all(processing_promises);
Run Code Online (Sandbox Code Playgroud)

切换到输出更清晰、更具确定性的示例:

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
  processing_promises.push(sleep_promise.then(result => {
     console.log("promise resolved: ",result);
  }));
}
await Promise.all(processing_promises);
Run Code Online (Sandbox Code Playgroud)

输出如预期:

15:54:16.331 promise resolved:  slept 1000
15:54:17.331 promise resolved:  slept 2000
15:54:18.331 promise resolved:  slept 3000
15:54:19.332 promise resolved:  slept 4000
15:54:20.331 promise resolved:  slept 5000
Run Code Online (Sandbox Code Playgroud)

我的问题是这样的:假设我想要或需要将上述处理表示为“async for..of”循环,而不是“then”回调;因此,promise 结果需要以异步可迭代的形式出现。我如何将承诺数组转换为这样的异步可迭代对象?我要求的是一个异步生成器函数 AwaitAs TheyCome(),将承诺列表作为输入,当承诺解决时,它会一一产生结果。然后我调用该函数并进行处理,如下所示:

for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}
Run Code Online (Sandbox Code Playgroud)

它应该给出与上面相同的输出(具有相同的时序)。

以下尝试的解决方案显然不起作用,但它可能会让我了解我期望它是多么简单和简短:

async function* AwaitAsTheyCome(promises) {
  for (const promise of promises) {
    promise.then(response => {
      yield response;  // WRONG
      // I want to yield it from AwaitAsTheyCome,
      // not from the current arrow function!
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

以下解决方案确实有效,但它的代码比我预期为此编写的代码要多。

async function* AwaitAsTheyCome(promises) {
  // Make a list of notifier promises and
  // functions that resolve those promises,
  // one for each of the original promises.
  const notifier_promises = [];
  const notifier_resolves = [];
  for (const promise of promises) {
    notifier_promises.push(
        new Promise(resolve=>notifier_resolves.push(resolve)));
  }

  const responses = [];
  for (const promise of promises) {
    promise.then(response => {
      responses.push(response);
      // send one notification (i.e. resolve the next notifier promise)
      notifier_resolves.shift()();
    });
  }

  for (const promise of promises) {
    // wait for one notification
    // (i.e. wait for the next notifier promise to be resolved).
    await notifier_promises.shift();
    // yield the corresponding response
    yield responses.shift();
  }
}

// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
  sleep(3000).then(()=>"slept 3000"),
  sleep(1000).then(()=>"slept 1000"),
  sleep(5000).then(()=>"slept 5000"),
  sleep(4000).then(()=>"slept 4000"),
  sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}
Run Code Online (Sandbox Code Playgroud)

有没有更简单的方法来实现异步生成器函数 AwaitAs TheyCome?

(我尝试用上面的代码制作一个 stacksnippet,但它不起作用——我怀疑这是因为代码片段系统不理解新的异步生成器和/或for await..of语法)

Ber*_*rgi 5

您可以通过以下方式稍微简化代码

\n
    \n
  • 仅对输入数组使用单个循环(尽管这可能会令人困惑)
  • \n
  • 不使用responses数组,而只是履行承诺
  • \n
  • 不使用.shift()Promise 数组,而只是循环它
  • \n
\n\n
async function* raceAll(input) {\n  const promises = [];\n  const resolvers = [];\n  for (const p of input) {\n    promises.push(new Promise(resolve=> {\n      resolvers.push(resolve);\n    }));\n    p.then(result => {\n      resolvers.shift()(result);\n    });\n  }\n\n  for (const promise of promises) {\n    yield promise;\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

如果您不喜欢所需的代码量,我建议将其在单独的模块中实现的队列分解出来。通过这个实现,代码可以变得像这样简单

\n
function raceAll(promises) {\n  const queue = new AsyncBlockingQueue();\n  for (const p of promises) {\n    p.then(result => {\n      queue.enqueue(result); \n    });\n  }\n  return queue[Symbol.asyncIterator]();\n}\n
Run Code Online (Sandbox Code Playgroud)\n

然而,这两种实现都忽略了一个关键问题:错误处理。如果这些承诺中的任何一个被拒绝,您将收到未处理的拒绝错误,这可能会导致您的进程崩溃。要真正让异步迭代器拒绝下一个承诺,以便循环周围的try/可以处理它,您需要执行类似的操作catchfor await\xe2\x80\xa6of

\n
async function* raceAll(input) {\n  const promises = [];\n  const resolvers = [];\n  for (const p of input) {\n    promises.push(new Promise(resolve => {\n      resolvers.push(resolve);\n    }));\n    p.finally(() => {\n      resolvers.shift()(p);\n    });\n    // works equivalent to:\n    // p.then(result => {\n    //   resolvers.shift()(result);\n    // }, error => {\n    //   resolvers.shift()(Promise.reject(error));\n    // });\n  }\n\n  for (const promise of promises) {\n    yield promise;\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

用被拒绝的 Promise 来解析 Promise 可以解决这个问题,这样我们仍然只需要一个resolver函数队列,而不是同时包含resolvereject函数的队列。

\n