CombineLatest,但只推左

Dan*_*ore 7 system.reactive

我需要实现一个版本CombineLatest(我WithLatest在这里称之为),它调用左边每个项目的选择器和右边的最新项目.它不应该仅仅改变右侧的项目.

我认为这是建立Observable.Create还是现有扩展的组合并不是特别重要; 无论哪种方式,我都会将其作为"盒装"扩展方法.

var left = new Subject<int>();
var right = new Subject<int>();

left.WithLatest(right, (l,r) => l + " " + r).Dump();

left.OnNext(1);   // <1>
left.OnNext(2);   // <2>
right.OnNext(1);  // <3>
right.OnNext(2);  // <4>
left.OnNext(3);   // <5>
Run Code Online (Sandbox Code Playgroud)

应该屈服

2 1
3 2
Run Code Online (Sandbox Code Playgroud)

编辑:我的例子的逻辑是:

  1. 左侧变为填充1.右侧为空,没有值被推送.
  2. 左边用2更新(它忘记了以前的值).右边仍然是空的,所以没有任何东西被推开.
  3. 右侧变为1,因此左= 2(最新值),右= 1被推.到目前为止,WithLatest和之间没有区别CombineLatest
  4. 权利更新 - 没有任何推动.这有什么不同
  5. Left更新为3,因此Left = 3,Right = 2(最新值)被推送.

有人建议我尝试:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();
Run Code Online (Sandbox Code Playgroud)

但这会阻止当前线程进行测试.

Tim*_*lds 5

您可以使用现有运算符执行此操作.

Func<int, int, string> selector = (l, r) => l + " " + r;

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
Run Code Online (Sandbox Code Playgroud)
  • Publish确保我们只订阅right一次并在所有订阅者之间共享订阅rs.

  • MostRecent变成一个IObservable<T>IEnumerable<T>总是产生从源可观察到的最近发射的值.

  • Zip每次observable发出一个值时,在IObservable<T>和之间IEnumerable<U>发出一个值.

  • SkipUntil跳过在发出值(l, r)之前发生的对right.


Ana*_*tts 0

这是使用 Create 的 hacky 方法 - 并没有真正构建它,如果它实际上不起作用,那就是我的错:)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
        this IObservable<TLeft> lhs, 
        IObservable<TRight> rhs, 
        Func<TLeft, TRight, TRet> sel)
{
    return Observable.Create<TRet>(subj => {
        bool rhsSet = false;
        bool deaded = false;
        var latestRhs = default(TRight);

        Action onDeaded = null;

        var rhsDisp = rhs.Subscribe(
            x => { latestRhs = x; rhsSet = true; }, 
            ex => { subj.OnError(ex); onDeaded(); });

        var lhsDisp = lhs
            .Where(_ => deaded == false && rhsSet == true)
            .Subscribe(
                x => subj.OnNext(sel(x, latestRhs)),
                ex => { subj.OnError(ex); onDeaded(); },
                () => { subj.OnCompleted(); onDeaded(); });

        onDeaded = () => {
            deaded = true;
            if (lhsDisp != null) {
                lhsDisp.Dispose();
                lhsDisp = null;
            }
            if (rhsDisp != null) {
                rhsDisp.Dispose();
                rhsDisp = null;
            }
        };

        return onDeaded;
    });
}
Run Code Online (Sandbox Code Playgroud)