无法取消订阅 Rx

Sno*_*oop 2 c# task reactive-programming winforms system.reactive

背景

我正在编写一些执行以下操作的软件:

  1. 用户单击“开始”。
  2. 启动一个任务来执行一些工作并启动事件来更新 GUI。
  3. 可观察对象使用任务中的事件,并将数据打印到 GUI 中的富文本框。

当我第一次单击“开始”时,一切正常,但之后就不行了。第一次单击开始时,我得到如下输出:

单击开始一次

这看起来不错,没有什么问题。但是,当我第二次单击“开始”时,我得到以下输出。

再次点击开始

现在,我相信我知道为什么会发生这种情况。据我所知,从我第一次单击“开始”开始,我的观察者就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:

    /// <summary>
    /// Starts the test.
    /// </summary>
    /// <param name="sender">The "start" button.</param>
    /// <param name="e">Clicking on the "start" button.</param>
    private void button_go_Click(object sender, RoutedEventArgs e)
    {
        var uiContext = SynchronizationContext.Current;
        var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

        // start running the test
        this.runningTest = new Task(() => { this.myTest.Run(); });
        this.runningTest.Start();

        // block on purpose, wait for the task to finish
        this.runningTest.Wait();
    }
Run Code Online (Sandbox Code Playgroud)

我对 .NET 中的反应式扩展很陌生(使用它还不到一小时)。我本质上使用的是 Stephen Cleary 在 C# 并发书中的示例之一,以便在这里开始。尽管如此,我相信我知道问题出在哪里......

拟议计划

如果我可以取消订阅可观察的内容,那么我认为这个问题就会消失。我认为这有点复杂,但根据ReactiveX.io使用的反应模式,听起来我实际上应该只在任务实际存在的期间聪明地观察任务,然后自然取消订阅。这是我调试此问题所遇到的情况,我不能 100% 确定取消订阅确实能解决问题。

问题

有什么方法可以取消订阅可观察的内容吗?或者,有没有一种方法可以让我只在任务存在期间观察它?

Eni*_*ity 5

Rx 使用该IDisposable接口使您能够取消订阅尚未自然结束的可观察订阅。如果您的可观察对象发送OnCompletedOnError通知,则订阅会自动为您处理。

这种方法的一个明显优点是,您可以创建一个CompositeDisposable来聚合所有订阅,从而实现单次取消订阅。这比删除事件处理程序所需的大杂烩要好得多。

在您的代码中,您不会结束订阅,因此每次点击button_go您都会创建一个新的订阅。

您可以使用四种解决方案,每种都略有不同。

(1)

请记住,需要调用的关键.Dispose()是您的可观察订阅是否尚未自然结束并且您希望它结束​​。因此,您只需.Take(1)在查询中添加 a 即可使其在生成一个值后自然结束。

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
        handler => (s, a) => handler(s, a),
        handler => this.myTest.Results += handler,
        handler => this.myTest.Results -= handler)
        .Take(1)
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}
Run Code Online (Sandbox Code Playgroud)

然后系统会自动为您处置订阅。

(2)

您可以用来SerialDisposable管理每个订阅。MSDN 文档将其描述为:

表示一个一次性产品,其底层一次性产品可以交换为另一个一次性产品,这会导致之前的底层一次性产品被丢弃。

然后你的代码将如下所示:

private SerialDisposable _results = new SerialDisposable();

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    _results.Disposable = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
        handler => (s, a) => handler(s, a),
        handler => this.myTest.Results += handler,
        handler => this.myTest.Results -= handler)
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}
Run Code Online (Sandbox Code Playgroud)

(3)

您始终可以确保只创建一次订阅。

private IDisposable _results = null;

private void button_go_Click(object sender, RoutedEventArgs e)
{
    if (_results == null)
    {
        var uiContext = SynchronizationContext.Current;
        _results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
    }

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}
Run Code Online (Sandbox Code Playgroud)

(4)

最后一种方法是将整个操作包装在一个可观察对象中,并为每个新订阅发起。这是使用 Rx 的正确方法。Observables 应该保持自己的状态,以便订阅可以 100% 相互独立。

现在,您的代码使用this.myTest&this.runningTest因此它显然具有状态。您应该尝试删除这些。但如果不这样做,您的代码将如下所示:

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    Observable
        .Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
        {
            var subscription =
                Observable
                    .FromEventPattern<TestResultHandler, TestResultArgs>(
                        h => this.myTest.Results += h,
                        h => this.myTest.Results -= h)
                    .Take(1)
                    .Subscribe(o);
            this.runningTest = new Task(() => { this.myTest.Run(); });
            this.runningTest.Start();
            return subscription;
        })
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}
Run Code Online (Sandbox Code Playgroud)

myTest理想情况下,您应该将可观察对象的创建和销毁结合起来。

所以,我倾向于做这样的事情:

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    Observable
        .Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
        {
            var myTest = new MyTest();
            var subscription =
                Observable
                    .FromEventPattern<TestResultHandler, TestResultArgs>(
                        h => myTest.Results += h,
                        h => myTest.Results -= h)
                    .Take(1)
                    .Subscribe(o);
            myTest.Run();
            return subscription;
        })
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}
Run Code Online (Sandbox Code Playgroud)

最后一个确实是您问题的答案:“或者,有没有一种方法可以让我只在任务存在期间观察它?”