有没有办法用 RxJS 管理并发?

Die*_*ego 5 node.js rxjs

TL;DR - 我正在寻找一种方法来控制 HTTP 请求并发连接到 REST API 的数量,同时我使用 RxJS。

我的 Node.js 应用程序将向第三方提供商发出几千次 REST API 调用。但是,我知道如果我一次发出所有这些请求,服务可能会因为 DDoS 攻击而关闭或拒绝我的请求。所以,我想在任何给定时间设置最大并发连接数。我曾经通过利用Throat Package使用 Promises 实现并发控制,但我还没有找到类似的方法来实现这一点。

我尝试merge按照这篇文章中的建议使用with 1 进行并发如何限制 flatMap 的并发?,但所有请求都是一次性发送的。

这是我的代码:

var Rx = require('rx'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });
Run Code Online (Sandbox Code Playgroud)

car*_*ant 5

您可以使用mergeMap操作符来执行 HTTP 请求并将响应扁平化为组合的 observable。mergeMap接受一个可选concurrent参数,您可以使用该参数指定并发订阅的 observables(即 HTTP 请求)的最大数量:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1);
Run Code Online (Sandbox Code Playgroud)

需要注意的是一个mergeMapconcurrent指定为1等同于concatMap

您问题中的代码一次发送所有请求的原因归结为httpGetmap运算符中调用您的函数。httpGet返回一个 Promise 并且 Promise 不是懒惰的 - 一旦httpGet被调用,请求就会被发送。

使用上面的代码,httpGet只有在mergeMap并发请求数少于指定数量时才会在实现中调用。

上面的代码将与组合的 observable 分开发出每个响应。如果您希望将响应组合成一个在所有请求完成时发出的数组,您可以使用toArray运算符:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1)
  .toArray();
Run Code Online (Sandbox Code Playgroud)

您还应该查看 Martin 在评论中引用的食谱。


Die*_*ego 2

感谢以上各位的回复。我的问题与使用rx而不是rxjs NPM 模块有关。在我卸载 rx 并安装 rxjs 后,所有示例都开始按预期使用并发。因此,使用 Promises、Callbacks 和 Native Observables 的 http 并发调用工作得很好。

我将它们发布在这里,以防有人遇到类似问题并可以进行故障排除。

基于 HTTP 请求回调的示例:

var Rx = require('rxjs'),
  request = require('request'),
  request_rx = Rx.Observable.bindCallback(request.get);

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return request_rx(url);
}

var subscription = source.subscribe(
  function (x, body) {
    console.log('=====', x[1].body, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });
Run Code Online (Sandbox Code Playgroud)

基于承诺的示例:

var Rx = require('rxjs'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });
Run Code Online (Sandbox Code Playgroud)

原生 RxJS 示例:

var Rx = require('rxjs'),
  superagent = require('superagent'),
  Observable = require('rxjs').Observable;

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/10',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/1',
];

let start = (new Date()).getTime();

var source = Rx.Observable.from(array)
    .mergeMap(httpGet, null, 1)
    .timestamp()
    .map(stamp => [stamp.timestamp - start, stamp.value]);

function httpGet(apiUrl) {
  return Observable.create((observer) => {
    superagent
        .get(apiUrl)
        .end((err, res) => {
            if (err) {
                return observer.onError(err);
            }
            let data,
                inspiration;
            data = JSON.parse(res.text);
            inspiration = data;
            observer.next(inspiration);
            observer.complete();
        });
    });
}

var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  });
Run Code Online (Sandbox Code Playgroud)