Ser*_*lla 6 javascript asynchronous system.reactive reactive-extensions-js rxjs
我希望以下代码将异步运行:
var range = Rx.Observable.range(0, 3000000);
range.subscribe(
function(x) {},
function(err) {},
function() {
console.log('Completed');
});
console.log('Hello World');
Run Code Online (Sandbox Code Playgroud)
但事实并非如此.需要一段时间才能完成大范围的数字,只有当它完成后才能恢复执行,你可以在这里尝试代码.
我很困惑何时期望RxJS同步或异步地运行.它取决于使用的方法吗?我之前的想法是,一旦我们进入Observables/Observer土地,其中的所有内容都是异步运行的,类似于承诺的工作方式.
Bra*_*don 21
RxJ遵循与Rx.Net相同的规则.默认情况下,每个可观察操作符使用执行其工作所需的最小异步数量.在这种情况下,Range可以同步运行数字,因此它(它的文档告诉你它将Rx.Scheduler.currentThread默认使用).
如果要引入比操作所需的更多异步性,则需要告诉它使用不同的Scheduler.
要获得您期望的行为,您需要使用Rx.Scheduler.timeout.实质上,这将使它通过计划每次迭代setTimeout.(实际上并非如此简单,调度程序将使用浏览器中可用的最快方法来安排延迟工作).
var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);
Run Code Online (Sandbox Code Playgroud)
请注意,迭代300万个数字setTimeout将几乎永远.所以也许我们想要分批处理它们.所以在这里我们将利用默认行为Range同步运行,然后批量处理并使用observeOn通过我们的超时调度程序运行批处理:
var range = Rx.Observable
.range(0, 3000000)
.bufferWithCount(1000)
.observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
.select(function (buffer, i) {
console.log("processing buffer", i);
return Rx.Observable.fromArray(buffer);
})
.concatAll(); // concat the buffers together
Run Code Online (Sandbox Code Playgroud)
jsFiddle请注意,在开始时有一个延迟,同时range通过所有3,000,000个值并bufferWithCount产生3,000个阵列.对于真正的生产代码来说,这种东西是不寻常的,因为你的数据源不是那么简单Observable.range.
在这方面,FYI承诺没有任何不同.如果您调用then已完成的promise,则该then函数可能会同步运行.所有Promises和Observable实际上都提供了一个接口,通过该接口,您可以提供在满足条件时保证运行的回调,条件是否已满足或稍后是否满足.然后,RxJs提供了许多机制来强制某些东西以异步方式运行,如果你真的想要那样的话.以及介绍具体时间的方法.
| 归档时间: |
|
| 查看次数: |
5084 次 |
| 最近记录: |