相关疑难解决方法(0)

我如何使用RxJ来阻止任何AJAX调用请求,直到前一个调用结束

我有一个observable,它代表一个由一些外部组件触发的动作.出于这个问题的目的,我们称之为createBananaAction.我有一个bananaService方法create执行一个AJAX请求并返回创建的香蕉作为Promise.

因此,每当有一些数据从createBananaAction我们到达时,我们都想打电话bananaService.create().

代码如下所示:(使用RxJs)

this.createdBananas = createBananaAction.flatMap(() => bananaService.create());
Run Code Online (Sandbox Code Playgroud)

现在,挑战是"限制"createBananaAction,以便它只能在收到前一个香蕉后请求另一个香蕉.简单地说:永远不会有两个同时来电bananaService.create().请注意,我不想及时限制,而是在bananaService执行其操作时忽略所有传入的新香蕉请求.

我做了一些研究,找到了看似合适的pausable操作员.

我的代码现在看起来像这样:

const pausableCreateBananaAction = createBananaAction.pausable();

this.createdBananas = pausableCreateBananaAction
    .do(() => pausableCreateBananaAction.pause())
    .flatMap(() => bananaService.create())
    .do(() => pausableCreateBananaAction.resume());
Run Code Online (Sandbox Code Playgroud)

这似乎有效,但我不喜欢我需要这些do语句来手动触发pauseresume语句的事实.

我发现,你可以通过一个可观察到pausable那么应该产生falsetrue在适当的时间,但是这也需要我在受到手动推值.像这样的东西:

const letItGoThrough = new Rx.Subject();

this.createdBananas = createBananaAction
    .pausable(letItGoThrough.startWith(true))
    .do(() => letItGoThrough.onNext(false))
    .flatMap(() => bananaService.create())
    .do(() => letItGoThrough.onNext(true));
Run Code Online (Sandbox Code Playgroud)

所以现在我有了一个Rx.Subject(主题就像是RxJs的训练轮,你使用的是直到你在RxJs上经验不足而你不再需要它们.)和两次调用do. …

javascript throttling reactive-programming rxjs

7
推荐指数
1
解决办法
1201
查看次数

如何限制flatMap的并发性?

我正在尝试使用RxJS编写一个脚本来处理数百个日志文件,每个文件大约1GB.脚本的骨架看起来像

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)

代码有效,但请注意所有日志文件的过滤步骤将同时启动.但是,从文件系统IO性能的角度来看,最好一个接一个地处理一个文件(或者至少将并发限制为几个文件而不是同时打开所有数百个文件).在这方面,我如何以"功能反应方式"实施?

我曾想过调度程序,但无法弄清楚它在这里有什么用处.

javascript rxjs

6
推荐指数
1
解决办法
4488
查看次数

并行触发异步请求,但使用rxjs按顺序获取结果

例如:

使用jquery ajax在parrallel中获取5页。当page2返回时,什么也不做。当page1返回时,对page1和page2进行操作。

// assume there is some operator that can do this, 
// then it might look like this?
Rx.Observable.range(1, 5).
someOperator(function(page) {
  return Rx.Observable.defer( () => $.get(page) );
}).scan(function(preVal, curItem) {
  preVal.push(curItem);
  return preVal;
}, []);
Run Code Online (Sandbox Code Playgroud)

javascript rxjs

5
推荐指数
1
解决办法
2206
查看次数

使用RxJS进行批处理?

我猜这应该有点容易实现,但我遇到了麻烦(从概念上讲,我猜)找出如何解决它.

我所拥有的是一个返回JSON对象数组的API.我需要逐步浏览这些对象,并为每个对象进行另一个AJAX调用.问题是处理每个AJAX调用的系统一次只能处理两个活动调用(因为它是一个CPU密集型任务,可以挂钩到桌面应用程序中).

我想知道如何使用RxJS(使用版本5或4)来实现这一目标?

编辑:此外,是否可以同时运行一系列步骤.即

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

我尝试过这样的事情:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );
Run Code Online (Sandbox Code Playgroud)

然而,这似乎不起作用.例如,downloadFile不等待processFile,convertFile和uploadFile全部完成,而是下一个将在前一个完成后再次运行.

javascript rxjs rxjs5

5
推荐指数
1
解决办法
1733
查看次数

Angular 2限制并行http调用

我有一个 Angular 2 应用程序,它从服务器获取不同数量的 id,然后为每个 id 在forkJoin.

然而,id 的数量可能高达数百个,当突然并行进行数百个 REST 调用时,这可能会出现问题。

使用 RxJ 和运算符时是否有办法限制并行调用的数量forkJoin

rxjs angular

3
推荐指数
1
解决办法
3273
查看次数