小编use*_*110的帖子

等待RxJS Observable的onNext中的异步操作

我有正常方式消耗的RxJS序列......

但是,在可观察的'onNext'处理程序中,某些操作将同步完成,但其他操作需要异步回调,需要在处理输入序列中的下一个项之前等待.

......有点困惑如何做到这一点.有任何想法吗?谢谢!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);
Run Code Online (Sandbox Code Playgroud)

javascript asynchronous rxjs

29
推荐指数
2
解决办法
2万
查看次数

试图使我自己的RxJs可观察

我正在尝试将现有的API转换为与RxJS一起使用...对于节点来说是相当新的,对RxJ来说是非常新的,所以请耐心等待.

我有一个现有的API(getNextMessage),它阻止(异步),或者当某些东西变得可用时,通过节点样式(错误,val)回调返回一个新项或错误.

所以它看起来像:

getNextMessage(nodeStyleCompletionCallback);

您可以将getNextMessage视为一个http请求,将在服务器响应时完成,但是一旦收到消息,您需要再次调用getNextMessage,以便不断从服务器获取新项目.

因此,为了使它成为一个可观察的集合,我必须让RxJs继续调用我的getNextMessage函数,直到订阅者被处置();

基本上,我正在尝试创建自己的RxJs可观察集合.

问题是:

  1. 我不知道如何使subscriber.dispose()杀死async.forever
  2. 我可能不应该首先使用async.forever
  3. 我不确定我是否应该为每条消息"完成" - 不应该在序列的最后
  4. 我想最终删除使用fromNodeCallback的需要,以获得第一个类RxJS可观察
  5. 显然我有点困惑.

会有点帮助,谢谢!

这是我现有的代码:

var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');

function observableReceive(portName)
{
    var observerCallback;
    var listenPort = new port(portName);
    var disposed = false;

    var asyncReceive = function(asyncCallback)
    {
        listenPort.getNextMessage(
            function(error, json)
            {
                observerCallback(error, json);

                if (!disposed)
                    setImmediate(asyncCallback);
            }
        );
    }

    return function(outerCallback)
    {
        observerCallback = outerCallback;
        async.forever(asyncReceive);
    }
}

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();

var subscription = …
Run Code Online (Sandbox Code Playgroud)

node.js rxjs

5
推荐指数
2
解决办法
8918
查看次数

标签 统计

rxjs ×2

asynchronous ×1

javascript ×1

node.js ×1