使用 Promise.all 进行异步等待和瓶颈速率限制

Emr*_*lan 2 node.js promise async-await intercom

我使用的 API 的速率限制为 500 个请求/分钟。因此我决定使用瓶颈。但我需要执行异步函数数组,它会生成 Promise 来进行 API 调用。我不确定我是否走对了路。因为 API 回复我“在 10 秒内超出了 83 个速率限制”,而我在 10 秒内只发送了 70 个请求。

这是我调用 main 函数的方式:

const result = await Helper.updateUsers(request.query.where);
..
..
Run Code Online (Sandbox Code Playgroud)

这是helper.js

const Boom = require("boom");
const mongoose = require("mongoose");
const Bottleneck = require("bottleneck");

const Intercom = require("intercom-client");

const config = require("../../config/config");

const client = new Intercom.Client({
  token: config.intercom.access_token
});

const User = mongoose.model("User");
const Shop = mongoose.model("Shop");

// create a rate limiter that allows up to 70 API calls per 10 seconds,
// with max concurrency of 70
const limiter = new Bottleneck({
  maxConcurrent: 70,
  minTime: 10000
});

// Helpers

// This function prepares a valid Intercom User Object.
// user -> User Object
// returns <Promise>
const prepareAndUpdateUser = async user => {
  try {
    let userData = {
      email: user.email,
      user_id: user._id,
      companies: []
    };
    Shop.find({ _id: { $in: user.account.shops } })
      .exec((err, shops) => {
        if (err) console.log("INTERCOM UPDATE USER", err);
        shops.forEach(shop => {
          let shopData = {
            company_id: shop._id,
            name: shop.name[shop.defaultLanguage.code]
          };
          userData.companies.push(shopData);
        });
        // Update Intercom Promise
        return client.users.create(userData);
      });
  } catch (e) {
    return Boom.boomify(err);
  }
};

module.exports.updateUsers = async query => {
  try {
    const users = await User.find(query)
      .populate("account")
      .limit(700);
    if (users && users.length > 0) {
      limiter.schedule(() => {
        const allTasks = users.map(
          async user => await prepareAndUpdateUser(user)
        );
        return Promise.all(allTasks);
      });
      return users.length;
    } else {
      return 0;
    }
  } catch (err) {
    return Boom.boomify(err);
  }
};
Run Code Online (Sandbox Code Playgroud)

我是否正确使用了 Bottleneck 和 Async-Await?

Jak*_*ger 6

首先要指出的是,您在async方法中使用回调而不是awaitPromise。您应该使用承诺返回版本Shops.find()await结果。

async function prepareAndUpdateUser(user) {
    try {
        const shops = await Shop.find({ _id: { $in: user.account.shops } }).exec();
        return client.users.create({
            email: user.email,
            user_id: user._id,
            companies: shops.map(shop => {
                return {
                    company_id: shop._id,
                    name: shop.name[shop.defaultLanguage.code]
                };
            })
        });
    } catch (e) {
        return Boom.boomify(err);
    }
}
Run Code Online (Sandbox Code Playgroud)

在您的updateUsers方法中,您向后使用速率限制器。您希望将用户映射到速率限制器,以便它可以控制何时prepareAndUpdateUser被调用,目前您将并行请求所有内容。您还需要等待速率限制器返回的承诺来解决。本质上你会想limiter.scehdule(...)搬进user.map(...).

async function updateUsers(query) {
    try {
        const users = await User.find(query)
            .populate("account")
            .limit(700);
        if (users && users.length > 0) {
            // Schedule an update for each user
            const allTasks = users.map(user => {
                // Schedule returns a promise that resolves when the operation is complete
                return limiter.schedule(() => {
                    // This method is called when the scheduler is ready for it
                    return prepareAndUpdateUser(user)
                })
            });
            // Wait for all the scheduled tasks to complete
            await Promise.all(allTasks);
            return users.length;
        } else {
            return 0;
        }
    } catch (err) {
        return Boom.boomify(err);
    }
}
Run Code Online (Sandbox Code Playgroud)