很长一段时间,我现在试图绕着RX包裹.而且,说实话,我不确定我是否得到它 - 或者不是.
今天,我在http://reactive-extensions.github.com/RxJS/上找到了一个解释- 在我看来 - 这太可怕了.它说:
RxJS是承诺异步的事件.
大.这是一个如此复杂的句子,如果你对RX的含义一无所知,在那句话之后,你就像以前一样愚蠢.
这基本上是我的问题:你发现的关于RX的常见地方的所有解释(至少我)都感到愚蠢.他们将RX解释为一个高度复杂的概念,包含许多高度复杂的单词和术语,我不知道它是什么.
所以我的问题是:你怎么解释RX给五岁的人?我想要一个清晰,风景如画的解释,它是什么,它有什么好处,它的主要概念是什么?
是否有可能在多个其他可观测量中分割出单个可观察通量?
我的用例是用户可以提交的表单.提交操作在可观察的情况下处理,并且在此操作中,有一个验证器正在侦听.
submitAction.forEach(validate)
Run Code Online (Sandbox Code Playgroud)
问题是我想将动作绑定到验证器检查success或者failure验证器检查.
validationFailure.forEach(outputErrors)
validationSuccess.forEach(goToPage)
Run Code Online (Sandbox Code Playgroud)
我不确定在反应式编程中如何处理类似的情况 - 分裂可观察性可能不是处理这类问题的正确解决方案.
无论如何,你会如何处理类似的案件?
处理允许流不终止的RxJS(或任何其他RX实现)中的错误的惯用方法是什么?
相关代码是
function convert(unit, value) {
var request = {};
request[unit] = value;
var conversion = $.ajax({
method: 'POST',
url: './convert.php',
data: request,
dataType: 'json'
}).promise();
return Rx.Observable.fromPromise(conversion).takeUntil(inInput.merge(cmInput));
}
var cmElement = document.getElementById('cm'),
inElement = document.getElementById('in');
var cmInput = Rx.Observable.fromEvent(cmElement, 'input').map(targetValue),
inInput = Rx.Observable.fromEvent(inElement, 'input').map(targetValue);
var inches = cmInput
.flatMap(convert.bind(null, 'cm'))
.startWith(0);
var centimeters = inInput
.flatMap(convert.bind(null, 'in'))
.startWith(0);
Run Code Online (Sandbox Code Playgroud)
因此,您可以看到我们使用输入字段更改流convert并将其传递给将其转换为另一个单元的函数,并进一步传递结果.
如果在$.ajax()调用期间发生错误,则它会向上传播,整个inches或cetimeters流停止(实际上是预期的).
但是如何实现它呢?
这样我就可以优雅地处理错误,比如显示错误消息,并在新数据到达时再试一次?
我目前的想法是引入像Haskell这样的复合类型Data.Either …
我有一个服务,它返回页面中的数据.对一个页面的响应包含有关如何查询下一页的详细信息.
我的方法是返回响应数据,然后如果有更多页面可用,则立即将延迟调用连接到相同的可观察序列.
function getPageFromServer(index) {
// return dummy data for testcase
return {nextpage:index+1, data:[1,2,3]};
}
function getPagedItems(index) {
return Observable.return(getPageFromServer(index))
.flatMap(function(response) {
if (response.nextpage !== null) {
return Observable.fromArray(response.data)
.concat(Observable.defer(function() {return getPagedItems(response.nextpage);}));
}
return Observable.fromArray(response.data);
});
}
getPagedItems(0).subscribe(
function(item) {
console.log(new Date(), item);
},
function(error) {
console.log(error);
}
)
Run Code Online (Sandbox Code Playgroud)
这一定是错误的方法,因为在2秒内你得到:
throw e;
^
RangeError: Maximum call stack size exceeded
at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51)
Run Code Online (Sandbox Code Playgroud)
分页的正确方法是什么?
我正在尝试将我的基于承诺的代码转换为RxJs,但很难让我的头围绕Rx尤其是RxJs.
我有一个带路径的数组.
var paths = ["imagePath1","imagePath2"];
Run Code Online (Sandbox Code Playgroud)
我喜欢在Javascript中加载图像
var img = new Image();
img.src = imagePath;
image.onload // <- when this callback fires I'll add them to the images array
Run Code Online (Sandbox Code Playgroud)
当加载所有图像时,我喜欢执行一个方法.
我知道有
Rx.Observable.fromArray(imagepathes)
Run Code Online (Sandbox Code Playgroud)
还有类似的东西
Rx.Observable.fromCallback(...)
Run Code Online (Sandbox Code Playgroud)
并且有类似flatMapLatest(...)
And Rx.Observable.interval或基于时间的调度程序
根据我的研究,我会假设这些将是解决它的成分,但我不能让组合工作.
那么如何从数组路径加载图像,当加载所有图像时,我会根据间隔执行一个方法?
谢谢你的帮助.
在处理RxJS Observable 的最后一次订阅时,执行副作用的最简洁方法是什么?这可能在Observable终止之前发生.
假设我需要一个函数返回一个Observable发出更改资源的函数.我希望在处理完所有订阅后执行清理操作.
var observable = streamResourceChanges(resource);
var subscription1 = observable.subscribe(observer1);
var subscription2 = observable.subscribe(observer2);
// ...
subscription1.dispose(); // Does not perform the cleanup
subscription2.dispose(); // Performs the cleanup
Run Code Online (Sandbox Code Playgroud)
我发现定义订阅处理操作的唯一方法是使用Rx.Observable.create.最后一次处理可以通过共享订阅来处理,例如Observable.prototype.singleInstance().
例如:
function streamResourceChanges(resource) {
return Rx.Observable.create(function(observer) {
// Subscribe the observer for resource changes...
// Return a cleanup function
return function() {
// Perform cleanup here...
console.log("Cleanup performed!");
};
}).singleInstance();
}
Run Code Online (Sandbox Code Playgroud)
有没有更简洁的方法来定义订阅处置的副作用,类似于doOnNext,doOnCompleted或doOnError …
我在使用递归的可观察链时遇到了一些麻烦.
我正在使用RxJS,它目前在1.0.10621版本中,并且包含最基本的Rx功能,与jx的Rx一起使用.
让我为我的问题介绍一个示例场景:我正在轮询Twitter搜索API(JSON响应)以查找包含特定关键字的推文/更新.响应还包括"refresh_url",用于生成后续请求.对该后续请求的响应将再次包含新的refresh_url等.
Rx.jQuery允许我使Twitter搜索API调用一个可观察的事件,它产生一个onNext,然后完成.到目前为止我所尝试的是让onNext处理程序记住refresh_url并在onCompleted处理程序中使用它来为下一个请求生成一个新的observable和相应的观察者.这样,一个可观察的+观察者对无限期地跟随另一个.
这种方法的问题是:
当他们的前任尚未被处置时,后续的可观察/观察者已经活着.
我必须做很多令人讨厌的簿记,以保持对当前生活观察者的有效参考,其实际上可能有两个.(一个在onCompleted,另一个在其生命周期的其他地方)当然,这个参考需要取消订阅/处置观察者.记账的另一种方法是通过"仍在运行?" - 布尔值来实现副作用,正如我在我的例子中所做的那样.
示例代码:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) …Run Code Online (Sandbox Code Playgroud) 我已经安装了visual studio 2010 express,然后安装了Reactive Extensions 2.0.当我在调试模式下启动我的应用程序时,抛出异常后使用反应式扩展的部分代码:
The assembly with display name 'System.Reactive.Debugger' failed to load in the 'Load' binding context of the AppDomain with ID 1. The cause of the failure was:
System.IO.FileNotFoundException: Could not load file or assembly
'System.Reactive.Debugger, Version=2.0.20823.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35' or one of its dependencies
Run Code Online (Sandbox Code Playgroud)
问题是我无处可找到这个dll - System.Reactive.Debugger.
以下代码在后来发出一个intafter 5000ms,然后是另一个5000ms:
let evens = Observable.interval(5000)
.map(i => {
return i * 2;
});
evens.subscribe((i) => {
console.log(i);
});
Run Code Online (Sandbox Code Playgroud)
是否可以这样做,但是立即得到第一个结果(0ms),然后5000m在后续结果之间等待?