了解rxjs中的反压 - 只缓存5个图像等待上传

Roa*_*ers 5 javascript reactive-programming node.js rxjs rxjs5

我正在研究一个需要提交数千张图像进行处理的节点项目.在将这些图像上传到处理服务器之前,需要调整它们的大小,以便我有以下几点:

imageList
    .map(image => loadAndResizeImage)
    .merge(3)
    .map(image => uploadImage)
    .merge(3)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

图像大小调整通常需要十分之几秒,上传和处理大约需要4秒.

当我等待上传队列清除时,如何防止在内存中累积数千个已调整大小的图像?我可能希望调整5张图像大小并等待,这样一旦图像上传完成,下一个已调整大小的图像就会从队列中拉出并上传,新图像会被调整大小并添加到"缓冲区"中.

可以在此处找到该问题的说明:

https://jsbin.com/webaleduka/4/edit?js,console

这里有一个加载步骤(耗时200ms)和一个处理步骤(耗时4秒).每个进程限制为2的并发性.我们可以看到,在25个初始项目中,我们在内存中获得了20个图像.

我确实看过缓冲区选项,但似乎都没有做我想做的事情.

目前我刚刚将负载,调整大小和上传到一个延迟的observable,我合并了最大并发.我想让图像等待上传,我相信它一定是可能的.

我正在使用RXjs 4,但我想5的主体将是相同的.

非常感谢.

Roa*_*ers 0

我认为我已经通过使用controlled()rxjs 运算符成功解决了这个问题:

var queuedImages = 0;

var imageSource = Rx.Observable.range(1, 25)
  .map(index => "image_" + index)
  .controlled();

imageSource
  .map(image => loadImage(image))
  .merge(2)
  .do((image) => {
    queuedImages++;
    console.log(`Images waiting for processing: ${queuedImages}`);
  })
  .map(image => processImage(image))
  .merge(2)
  .do( () => {
    queuedImages--;
    console.log(`Images waiting for processing: ${queuedImages}`);

    if(queuedImages < 4){
      console.log(`requesting more image loads`);
      imageSource.request(4-queuedImages);
    }
  })
  .subscribe( 
    (item) => {}, null, 
    () => console.log(`All Complete`) );

imageSource.request(4);
Run Code Online (Sandbox Code Playgroud)

最初请求 4 张图像。这些是从光盘加载然后进行处理的。当加载并处理每个图像时,使用该queuedImages变量来跟踪内存中图像的数量。当该数字低于 4 时,会请求更多图像。

可以在这里看到这个的 jsbin:

https://jsbin.com/webaleduka/11/edit?js,console

此方法意味着缓存中的图像永远不会超过 6 个左右,并确保缓存中始终有足够的图像等待上传。