我需要实现一个版本CombineLatest(我WithLatest在这里称之为),它调用左边每个项目的选择器和右边的最新项目.它不应该仅仅改变右侧的项目.
我认为这是建立Observable.Create还是现有扩展的组合并不是特别重要; 无论哪种方式,我都会将其作为"盒装"扩展方法.
例
var left = new Subject<int>();
var right = new Subject<int>();
left.WithLatest(right, (l,r) => l + " " + r).Dump();
left.OnNext(1); // <1>
left.OnNext(2); // <2>
right.OnNext(1); // <3>
right.OnNext(2); // <4>
left.OnNext(3); // <5>
Run Code Online (Sandbox Code Playgroud)
应该屈服
2 1
3 2
Run Code Online (Sandbox Code Playgroud)
编辑:我的例子的逻辑是:
WithLatest和之间没有区别CombineLatest有人建议我尝试:
var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();
Run Code Online (Sandbox Code Playgroud)
但这会阻止当前线程进行测试.
您可以使用现有运算符执行此操作.
Func<int, int, string> selector = (l, r) => l + " " + r;
var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
Run Code Online (Sandbox Code Playgroud)
Publish确保我们只订阅right一次并在所有订阅者之间共享订阅rs.
MostRecent变成一个IObservable<T>成IEnumerable<T>总是产生从源可观察到的最近发射的值.
Zip每次observable发出一个值时,在IObservable<T>和之间IEnumerable<U>发出一个值.
SkipUntil跳过在发出值(l, r)之前发生的对right.
这是使用 Create 的 hacky 方法 - 并没有真正构建它,如果它实际上不起作用,那就是我的错:)
public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
this IObservable<TLeft> lhs,
IObservable<TRight> rhs,
Func<TLeft, TRight, TRet> sel)
{
return Observable.Create<TRet>(subj => {
bool rhsSet = false;
bool deaded = false;
var latestRhs = default(TRight);
Action onDeaded = null;
var rhsDisp = rhs.Subscribe(
x => { latestRhs = x; rhsSet = true; },
ex => { subj.OnError(ex); onDeaded(); });
var lhsDisp = lhs
.Where(_ => deaded == false && rhsSet == true)
.Subscribe(
x => subj.OnNext(sel(x, latestRhs)),
ex => { subj.OnError(ex); onDeaded(); },
() => { subj.OnCompleted(); onDeaded(); });
onDeaded = () => {
deaded = true;
if (lhsDisp != null) {
lhsDisp.Dispose();
lhsDisp = null;
}
if (rhsDisp != null) {
rhsDisp.Dispose();
rhsDisp = null;
}
};
return onDeaded;
});
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2083 次 |
| 最近记录: |