Microsoft已发布Javascript的Reactive Extensions.它应该使异步(和基于事件的)web-ui编程变得容易.
与其他"FRP"库不同,Rx不会阻止毛刺:使用时间不匹配的数据调用回调.有办法解决这个问题吗?
作为一个例子,假设我们有一系列昂贵的计算从单个流派生(例如,而不是_.identity,下面,我们做一个排序,或ajax提取).我们做了明确的改变,以避免重新计算昂贵的东西.
sub = new Rx.Subject();
a = sub.distinctUntilChanged().share();
b = a.select(_.identity).distinctUntilChanged().share();
c = b.select(_.identity).distinctUntilChanged();
d = Rx.Observable.combineLatest(a, b, c, function () { return _.toArray(arguments); });
d.subscribe(console.log.bind(console));
sub.onNext('a');
sub.onNext('b');
Run Code Online (Sandbox Code Playgroud)
第二个事件最终将导致一些错误的状态:我们得到三个事件,而不是一个,这浪费了一堆cpu并要求我们明确解决不匹配的数据.
这个特定的例子可以通过删除distinctUntilChanged来解决,如果输入没有改变,可以编写一些不稳定的scan()函数来传递前一个结果.然后你可以压缩结果,而不是使用combineLatest.它很笨拙但可行.
但是,如果在任何地方存在异步,例如ajax调用,则zip不起作用:ajax调用将同步(如果已缓存)或异步完成,因此您无法使用zip.
编辑
尝试使用更简单的示例来阐明所需的行为:
你有两个流,a和b.b取决于a.b是异步的,但是浏览器可以缓存它,因此它可以独立地更新,也可以同时更新.因此,浏览器中的特定事件可能会导致以下三种情况之一:更新; b更新; a和b都更新.期望的行为是在所有三种情况下仅调用一次回调(例如渲染方法).
zip不起作用,因为当a或b单独触发时,我们不会收到来自zip的回调.combineLatest不起作用,因为当a和b一起发射时,我们得到两个回调.
我有两个行为主题流我正在尝试forkJoin没有运气.正如我想象的那样,它会回馈它的最后两个值.这可能以某种方式实现吗?
在主题之后不会调用它.
let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');
Observable.forkJoin(stream1, stream2)
.subscribe(r => {
console.log(r);
});
Run Code Online (Sandbox Code Playgroud) 我有一个可观察的集合,我想继续喂养对象,即使有人订阅了它,它们也应该到达观察者(哪个是可观察的主要目标).我该怎么做?
在以下程序中,在订阅发生之后,我想要输入3个以下的数字,这些数字应该到达观察者.我该怎么做呢?
我不想通过实现IObservable<int>和使用Publish方法来实现我自己的Observable类的路径?有没有其他方法来实现这一目标?
public class Program
{
static void Main(string[] args)
{
var collection = new List<double> { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
var observableCollection = collection.ToObservable();
observableCollection.Subscribe(OnNext);
//now I want to add 100, 101, 102 which should reach my observers
//I know this wont' work
collection.Add(100);
collection.Add(101);
collection.Add(102);
Console.ReadLine();
}
private static void OnNext(double i)
{
Console.WriteLine("OnNext - {0}", i);
}
}
Run Code Online (Sandbox Code Playgroud) 有没有像RxJS那样的解决方案? 是否可以在Rx中的不同线程上调用订阅者的OnNexts?
PS我的第一个天真的方法(在CoffeeScript中)明显失败了:
hObs = Rx.Observable.interval(35000)
.startWith(-1)
.select(moment().format("D MMMM, HH:mm:ss"))
.publish()
hObs.subscribe((x)->console.log(x))
hObs.connect()
hObs.subscribe((x)->console.log(x, 1))
hObs.connect()
Run Code Online (Sandbox Code Playgroud)
第二个订阅在35秒间隔内不返回任何内容,依此类推
我希望以下代码将异步运行:
var range = Rx.Observable.range(0, 3000000);
range.subscribe(
function(x) {},
function(err) {},
function() {
console.log('Completed');
});
console.log('Hello World');
Run Code Online (Sandbox Code Playgroud)
但事实并非如此.需要一段时间才能完成大范围的数字,只有当它完成后才能恢复执行,你可以在这里尝试代码.
我很困惑何时期望RxJS同步或异步地运行.它取决于使用的方法吗?我之前的想法是,一旦我们进入Observables/Observer土地,其中的所有内容都是异步运行的,类似于承诺的工作方式.
javascript asynchronous system.reactive reactive-extensions-js rxjs
我在RxJS中遇到了一个特殊的生产者消费者问题:生产者慢慢生产元素.消费者正在请求元素,并且经常必须等待生产者.这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
Run Code Online (Sandbox Code Playgroud)
有时请求会中止.生成的元素只应在未中止请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
Run Code Online (Sandbox Code Playgroud)
第一个请求r1将使用第一个生成的元素p1,但在它可以消耗之前r1被中止.生成并在第二次请求时消耗.第二次中止被忽略,因为之前没有发出未答复的请求.第三个请求必须等待下一个生成的元素,并且在生成之前不会中止.因此,在生产后立即消耗.a(r1)p1p1c(p1, r2)r2a(?)r3p2p2p2c(p2, r3)
我怎样才能在RxJS中实现这一目标?
编辑:
我创建了一个例子与jsbin一个QUnit测试.您可以编辑该功能createConsume(produce, request, abort)以尝试/测试您的解决方案.
该示例包含先前接受的答案的函数定义.
为什么flatMap不会导致下游减少火灾?
我得到的代码如下:
handleFiles.flatMap(files =>
Rx.Observable.from(files).
flatMap((file, i) => fileReader(file, i)).
reduce((form, file, i) => {
form.append('file[' + i + ']', result);
console.log('reduce step', file);
return form;
}, new FormData()).
tap(console.log.bind(console, 'after reduce'))
).
subscribe(console.log.bind(console, 'response'));
Run Code Online (Sandbox Code Playgroud)
而问题是"减少后"点击从未被击中.为什么?
日志如下:
reduce step [data]
reduce step [data]
Run Code Online (Sandbox Code Playgroud)
截图:
我有一组可更改的子组件(POJO 对象),每个组件都有自己的状态流。每次用户触发 addChild/removeChild/clearChildren 时,都会使用 #switchMap 发出一组新的子状态流。到目前为止,一切都很好!(对 RxJS 感到非常惊讶!)
只要不是空数组Rx.Observable.from(arrayOfStateStreams).combineAll(),我就能得到很好的结果。arrayOfStateStreams
由于这是在更高级别上组合(最新)的部分状态,因此我需要发出一个空数组,否则全局状态树将包含不再正确的旧状态数据!
我可以发出一些保留令牌,例如['EMPTY-ARRAY-PLACEHOLDER-TOKEN'],但这很奇怪。更好的方法是始终将最后一个流附加到数组中,因此最后一个索引可以被视为垃圾。但仍然令人困惑的代码和状态。使用[null]是不行的,因为我们可能有一个子状态'null'.
谁能用好的方法解决这个问题?因为#combineAll 之后不应该有其他空数组的表示,所以不能支持这一点吗?