der*_*ink 8 javascript reactive-programming reactive-extensions-js rxjs
我在使用递归的可观察链时遇到了一些麻烦.
我正在使用RxJS,它目前在1.0.10621版本中,并且包含最基本的Rx功能,与jx的Rx一起使用.
让我为我的问题介绍一个示例场景:我正在轮询Twitter搜索API(JSON响应)以查找包含特定关键字的推文/更新.响应还包括"refresh_url",用于生成后续请求.对该后续请求的响应将再次包含新的refresh_url等.
Rx.jQuery允许我使Twitter搜索API调用一个可观察的事件,它产生一个onNext,然后完成.到目前为止我所尝试的是让onNext处理程序记住refresh_url并在onCompleted处理程序中使用它来为下一个请求生成一个新的observable和相应的观察者.这样,一个可观察的+观察者对无限期地跟随另一个.
这种方法的问题是:
当他们的前任尚未被处置时,后续的可观察/观察者已经活着.
我必须做很多令人讨厌的簿记,以保持对当前生活观察者的有效参考,其实际上可能有两个.(一个在onCompleted,另一个在其生命周期的其他地方)当然,这个参考需要取消订阅/处置观察者.记账的另一种方法是通过"仍在运行?" - 布尔值来实现副作用,正如我在我的例子中所做的那样.
示例代码:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Run Code Online (Sandbox Code Playgroud)
不要被请求到推文的双层observable /观察者所迷惑.我的例子主要关注第一层:从Twitter请求数据.如果在解决这个问题时,第二层(将响应转换为推文)可以与第一层成为一体,这将是非常棒的; 但我认为这是完全不同的事情.目前.
Erik Meijer向我指出了Expand运算符(参见下面的示例),并建议使用Join模式作为替代方案.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Run Code Online (Sandbox Code Playgroud)
这应该可以复制到LINQPad中.它假设单个可观察量并产生一个最终观察者.
所以我的问题是:如何在RxJS中做最好的扩展技巧?
编辑:
扩展运算符可能如此线程中所示实现.但是需要生成器(我只有JS <1.6).
不幸的是,RxJS 2.0.20304-beta没有实现Extend方法.
因此,我将尝试以与您稍有不同的方式解决您的问题,并采取一些您可以更轻松解决的自由。
所以我不能告诉你的一件事是你是否正在尝试执行以下步骤
或者是否有用户操作(获取更多/滚动到底部)。不管怎样,这确实是同样的问题。不过,我可能错误地阅读了你的问题。这是这个问题的答案。
function getMyTweets(headUrl) {
return Rx.Observable.create(function(observer) {
innerRequest(headUrl);
function innerRequest(url) {
var next = '';
// Some magic get ajax function
Rx.get(url).subscribe(function(res) {
observer.onNext(res);
next = res.refresh_url;
},
function() {
// Some sweet handling code
// Perhaps get head?
},
function() {
innerRequest(next);
});
}
});
}
Run Code Online (Sandbox Code Playgroud)
这可能不是您想要的答案。如果没有,抱歉!
编辑:查看代码后,看起来您想将结果作为数组并观察它。
// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
.selectMany(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
});
// Ensures ordering
getMyTweets('url')
.select(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
})
.concat();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4037 次 |
| 最近记录: |