JBC*_*BCP 6 javascript reactive-programming rxjs
我正在使用一个调用我实现的函数的框架.我希望将此函数的参数转换为Observable,并通过一系列Observers发送.我以为我可以使用一个主题,但它没有像我预期的那样表现.
为了澄清,我有类似下面的代码.我认为Option 1
下面会有效,但到目前为止我一直在努力Option 2
,这似乎不是惯用语.
var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// Option 1: I thought this would work, but it doesn't
// eventSource.subscribe(eventSubject);
// Option 2: This does work, but its obviously clunky
eventSource.subscribe(
function(event) {
log.debug("sending to subject");
eventSubject.onNext(event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSource onCompleted');
}
);
}
Run Code Online (Sandbox Code Playgroud)
只是当您将整个Subject
可观察对象订阅时,您最终将该onComplete
可观察对象的事件订阅给了主题。这意味着当可观察完成时,它将完成你的主题。因此,当您获得下一组事件时,它们不会执行任何操作,因为主题已经完成。
一种解决方案正是您所做的。只需将事件订阅onNext
到主题即可。
第二种解决方案,可以说更像“rx”,是将传入的数据视为可观察数据的可观察数据,并使用mergeAll压平此可观察数据流:
var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
.mergeAll() // subscribe to each inner observable as it arrives
.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// send a new inner observable sequence
// into the subject
eventSubject.onNext(eventSource);
}
Run Code Online (Sandbox Code Playgroud)
更新:这是一个正在运行的示例