我有正常方式消耗的RxJS序列......
但是,在可观察的'onNext'处理程序中,某些操作将同步完成,但其他操作需要异步回调,需要在处理输入序列中的下一个项之前等待.
......有点困惑如何做到这一点.有任何想法吗?谢谢!
someObservable.subscribe(
function onNext(item)
{
if (item == 'do-something-async-and-wait-for-completion')
{
setTimeout(
function()
{
console.log('okay, we can continue');
}
, 5000
);
}
else
{
// do something synchronously and keep on going immediately
console.log('ready to go!!!');
}
},
function onError(error)
{
console.log('error');
},
function onComplete()
{
console.log('complete');
}
);
Run Code Online (Sandbox Code Playgroud) 我正在尝试将现有的API转换为与RxJS一起使用...对于节点来说是相当新的,对RxJ来说是非常新的,所以请耐心等待.
我有一个现有的API(getNextMessage),它阻止(异步),或者当某些东西变得可用时,通过节点样式(错误,val)回调返回一个新项或错误.
所以它看起来像:
getNextMessage(nodeStyleCompletionCallback);
您可以将getNextMessage视为一个http请求,将在服务器响应时完成,但是一旦收到消息,您需要再次调用getNextMessage,以便不断从服务器获取新项目.
因此,为了使它成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到订阅者被处置();
基本上,我正在尝试创建自己的RxJs可观察集合.
问题是:
会有点帮助,谢谢!
这是我现有的代码:
var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');
function observableReceive(portName)
{
var observerCallback;
var listenPort = new port(portName);
var disposed = false;
var asyncReceive = function(asyncCallback)
{
listenPort.getNextMessage(
function(error, json)
{
observerCallback(error, json);
if (!disposed)
setImmediate(asyncCallback);
}
);
}
return function(outerCallback)
{
observerCallback = outerCallback;
async.forever(asyncReceive);
}
}
var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();
var subscription = …Run Code Online (Sandbox Code Playgroud)