dre*_*dre 3 javascript http-live-streaming rxjs
我有一个来自EventEmitter的Observable,它实际上只是一个http连接,流媒体事件.
有时我必须断开与底层流的连接并重新连接.我不知道如何使用rxjs来处理这个问题.
我不确定我是否可以完成一个源,然后动态添加其他"源"到源,或者如果我必须做我喜欢的事情.
var Rx = require('rx'),
EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var eventEmitter2 = new EventEmitter();
var source = Rx.Observable.fromEvent(eventEmitter, 'data')
var subscription = source.subscribe(function (data) {
console.log('data: ' + data);
});
setInterval(function() {
eventEmitter.emit('data', 'foo');
}, 500);
// eventEmitter stop emitting data, underlying connection closed
// now attach seconds eventemitter (new connection)
// something like this but obvouisly doesn't work
source
.fromEvent(eventEmitter2, 'data')
Run Code Online (Sandbox Code Playgroud)
Puesdo代码更多的是我正在做的事情,我在关闭第一个之前创建第二个流连接,所以我不会"丢失"任何数据.在这里我不知道如何在没有"丢失"记录的情况下停止Observable,因为onNext由于缓冲而没有被调用.
var streams = [], notifiers = [];
// create initial stream
createNewStream();
setInterval(function() {
if (params of stream have changed) createNewStream();
}, $1minutes / 3);
function createNewStream() {
var stream = new eventEmitterStream();
stream.once('connected', function() {
stopOthers();
streams.push(stream);
createSource(stream, 'name', 'id');
});
}
function stopOthers() {
while(streams.length > 0) {
streams.pop().stop(); // stop the old stream
}
while(notifiers.length > 0) {
// if i call this, the buffer may lose records, before onNext() called
//notifiers.pop()(Rx.Notification.createOnCompleted());
}
}
function createObserver(tag) {
return Rx.Observer.create(
function (x) {
console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
},
function (err) {
console.log('Error: ', tag, err);
},
function () {
console.log('Completed', tag);
});
}
function createSource(stream, event, id) {
var source = Rx.Observable
.fromEvent(stream, event)
.bufferWithTimeOrCount(time, max);
var subscription = source.subscribe(createObserver(id));
var notifier = subscription.toNotifier();
notifiers.push(notifier);
}
Run Code Online (Sandbox Code Playgroud)
首先是formost,你需要确保你可以从之前"死"的发射器中删除所有听众.否则你将创建一个漏洞的应用程序.
看起来你知道EventEmitter已经死亡的唯一方法是观察频率,除非你有一个错误或完成时触发的事件(断开连接).后者更受欢迎.
无论如何,Rx的秘诀是确保将您的数据流创建和拆解包装在您的可观察对象中.如果将发射器的创建包装在您的可观察对象中,以及将其拆除的方法,您将能够使用像retry运算符这样的令人敬畏的东西来重新创建该可观察对象.
因此,如果您无法知道它是否已经死亡,并且您想重新连接它,您可以使用以下内容:
// I'll presume you have some function to get an EventEmitter that
// is already set up
function getEmitter() {
var emitter = new EventEmitter();
setInterval(function(){
emitter.emit('data', 'foo');
}, 500)
return emitter;
}
var emitterObservable = Observable.create(function(observer) {
// setup the data stream
var emitter = getEmitter();
var handler = function(d) {
observer.onNext(d);
};
emitter.on('data', handler);
return function() {
// tear down the data stream in your disposal function
emitter.removeListener('on', handler);
};
});
// Now you can do Rx magic!
emitterObservable
// if it doesn't emit in 700ms, throw a timeout error
.timeout(700)
// catch all* errors and retry
// this means the emitter will be torn down and recreated
// if it times out!
.retry()
// do something with the values
.subscribe(function(x) { console.log(x); });
Run Code Online (Sandbox Code Playgroud)
*注意:重试会捕获所有错误,因此您可能需要在其catch上方添加一个以处理非超时错误.由你决定.
| 归档时间: |
|
| 查看次数: |
2453 次 |
| 最近记录: |