Reactive Extensions - 属性相互更新

Mim*_*ord 5 c# asynchronous system.reactive

我有2个DecimalUpDown控件,num_one和num_two,分别绑定到属性First和Second.当First更改时,它将联系服务器以计算Second的值,反之亦然.异步启动服务器调用释放UI但是,在快速触发(例如滚轮)时,最后一个请求并不总是最后一个请求,因此值可能会不同步.

使用Reactive我试图阻止调用只在用户停止更改一段时间后才触发服务器调用.问题是,当您在更新期间进行更改时,属性更改开始相互触发并根据节流阀的TimeSpan来回停留.

    public MainWindow()
    {
        InitializeComponent();

        DataContext = this;

        Observable.FromEventPattern<RoutedPropertyChangedEventHandler<object>, RoutedPropertyChangedEventArgs<object>>(h => num_one.ValueChanged += h, h => num_one.ValueChanged -= h)
            .Throttle(TimeSpan.FromMilliseconds(100), Scheduler.ThreadPool)
           .Subscribe(x =>
           {
               Thread.Sleep(300); // simulate work
               Second = (decimal)x.EventArgs.NewValue / 3.0m;
           });

        Observable.FromEventPattern<RoutedPropertyChangedEventHandler<object>, RoutedPropertyChangedEventArgs<object>>(h => num_two.ValueChanged += h, h => num_two.ValueChanged -= h)
            .Throttle(TimeSpan.FromMilliseconds(100), Scheduler.ThreadPool)
           .Subscribe(x =>
           {
               Thread.Sleep(300); // simulate work
               First = (decimal)x.EventArgs.NewValue * 3.0m;
           });
    }

    private decimal first;
    public decimal First
    {
        get { return first; }
        set
        {
            first = value;
            NotifyPropertyChanged("First");
        }
    }

    private decimal second;
    public decimal Second
    {
        get { return second; }
        set
        {
            second = value;
            NotifyPropertyChanged("Second");
        }
    }
Run Code Online (Sandbox Code Playgroud)

Eni*_*ity 7

有一个内置的Rx操作符可以帮助您在不使用Throttle和超时的情况下完成您想要的Switch操作- 它是操作员.

Switch运营商不工作的IObservable<T>所以大多数时候,你永远不会看到它在智能感知.

相反,它操作IObservable<IObservable<T>>- 一个可观察的流 - 并IObservable<T>通过不断切换到生成的最新可观察量(并忽略先前可观察量的任何值)来平坦化源.它只在外部observable完成时才完成,而不是在内部observable完成时完成.

这正是您想要的 - 如果发生新的值更改,则忽略任何先前的结果,并仅返回最新的结果.

这是怎么做的.

首先,我将令人讨厌的事件处理代码移到了几个可观察对象中.

var ones =
    Observable
        .FromEventPattern<
            RoutedPropertyChangedEventHandler<object>,
            RoutedPropertyChangedEventArgs<object>>(
            h => num_one.ValueChanged += h,
            h => num_one.ValueChanged -= h)
        .Select(ep => (decimal)ep.EventArgs.NewValue);

var twos =
    Observable
        .FromEventPattern<
            RoutedPropertyChangedEventHandler<object>,
            RoutedPropertyChangedEventArgs<object>>(
            h => num_two.ValueChanged += h,
            h => num_two.ValueChanged -= h)
        .Select(ep => (decimal)ep.EventArgs.NewValue);
Run Code Online (Sandbox Code Playgroud)

你的代码似乎有点混乱.我假设DecimalUpDown控件的值是返回结果的服务器函数的输入.所以这里是调用服务器的函数.

Func<decimal, IObservable<decimal>> one2two = x =>
    Observable.Start(() =>
    {
        Thread.Sleep(300); // simulate work
        return x / 3.0m;
    });

Func<decimal, IObservable<decimal>> two2one = x =>
    Observable.Start(() =>
    {
        Thread.Sleep(300); // simulate work
        return x * 3.0m;
    });
Run Code Online (Sandbox Code Playgroud)

显然,您在这两个函数中放入了实际的服务器代码调用.

现在连接最终的可观察量和订阅几乎是微不足道的.

ones
    .DistinctUntilChanged()
    .Select(x => one2two(x))
    .Switch()
    .Subscribe(x =>
    {
        Second = x;
    });

twos
    .DistinctUntilChanged()
    .Select(x => two2one(x))
    .Switch()
    .Subscribe(x =>
    {
        First = x;
    });
Run Code Online (Sandbox Code Playgroud)

DistinctUntilChanged确保我们只拨打电话,如果实际值发生变化.

然后,很容易调用两个服务器函数,执行Switch并仅返回最新结果,然后将其分配给属性.

您可能需要在此处或那里弹出调度程序以及ObserveOn将订阅传递到UI线程,否则此解决方案应该可以很好地工作.