订阅在Reactive Extensions中结束时关闭非托管资源

Dor*_*rus 1 c# system.reactive

我正在从Rx向网络写入数据.当然,我Finally在订阅结束时用来关闭我的流.这在干净的OnError()OnComplete().RX将运行OnNext() ... OnNext(),OnComplete(),Finally()按顺序排列.

但是,有时我想提前终止序列,我这样做是为了使用Dispose().不知怎的Finally(),现在与最后一次OnNext()调用并行运行,导致仍然写入流中的异常OnNext(),以及不完整的写入.

我的订阅看起来大致如下:

NetworkStream stm = client.GetStream();
IDisposable disp = obs
    .Finally(() => {
        client.Close();
    })
    .Subscribe(d => {
        client.GetStream().Write(d.a, 0, d.a.Lenght);
        client.GetStream().Write(d.b, 0, d.b.Lenght);
    } () => {
        client.GetStream().Write(something(), 0, 1);
    });
Thread.sleep(1000);
disp.Dispose();
Run Code Online (Sandbox Code Playgroud)

我也试过了替代方案CancellationToken.

如何正确取消订阅?我不介意它跳过OnComplete(),只要Finally()还在运行.但是,Finally()并行运行存在问题.

我也觉得应该有一个更好的方法来管理资源,通过将声明移动到序列中,这将是一个更好的解决方案.

编辑:以下代码更清楚地显示问题.我希望它总是打印出来,相反,它会经常给出错误,表示在最后一次之前Dispose结束OnNext.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Try finally");
            for (int i = 0; i < 10; i++)
            {
                Finally();
            }
            Console.WriteLine("Try using");
            for (int i = 0; i < 10; i++)
            {
                Using();
            }
            Console.WriteLine("Try using2");
            for (int i = 0; i < 10; i++)
            {
                Using2();
            }
            Console.ReadKey();
        }

        private static void Using2()
        {
            bool b = true, c = true, d;
            var dis = Disposable.Create(() => c = b);
            IDisposable obDis = Observable.Using(
                () => dis,
                _ => Observable.Create<Unit>(obs=>
                    Observable.Generate(0,
                    i => i < 1000,
                    i => i + 1,
                    i => i,
                    i => TimeSpan.FromMilliseconds(1)
                ).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
                .Subscribe();
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }

        private static void Using()
        {
            bool b = true, c = true, d;
            var dis = Disposable.Create(() => c = b);
            IDisposable obDis = Observable.Using(
                () => dis,
                _ => Observable.Generate(0,
                    i => i < 1000,
                    i => i + 1,
                    i => i,
                    i => TimeSpan.FromMilliseconds(1)
                )).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }

        private static void Finally()
        {
            bool b = true, c = true, d;
            IDisposable obDis = Observable.Generate(0,
                i => i < 1000,
                i => i + 1,
                i => i,
                _ => DateTime.Now.AddMilliseconds(1)
                )
                .Finally(() => c = b)
                .Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Dan*_*ber 5

Finally很可能不是你想要的.取消订阅时,它不会处置您的资源.相反,它的行为就像finallyC#中的普通块一样,也就是说,无论相应的try-block中的代码是否引发异常,它都将保证执行某些代码.此外,鉴于MSDN上的这个问题,您的代码Finally甚至可能不会在每种情况下执行,因为您的订阅未指定错误处理程序.

你可能想要的是Using:

IDisposable disp = Observable
    .Using(
        () => Disposable.Create(() => client.Close),
        _ => obs)
    .Subscribe(....);
Run Code Online (Sandbox Code Playgroud)

Using 只要可观察终止或订阅被取消,就会小心处理资源.

假设client是一个TcpClient,它变得更简单:

IDisposable disp = Observable
    .Using(
        () => client),
        _ => obs)
    .Subscribe(....);
Run Code Online (Sandbox Code Playgroud)

我希望调用OnNext不会与关闭客户端重叠,即使提前取消预订,但我还没有测试过.

最后一件事:注意关闭外部变量,如stm示例中所示.总是和当地人一起工作更安全.完全重写,因为我会尝试它是这样的:

IDisposable disp = Observable.Using(
    () => client,
    _ => Observable.Using(
         () => client.GetStream(),
         stream => Observable.Create<Unit>(observer => obs
             .Subscribe(
                 d => {
                     stream.Write(d.a, 0, d.a.Lenght);
                     stream.Write(d.b, 0, d.b.Lenght);
                 },
                 () => {
                     stream.Write(something(), 0, 1);
                 }))))
    .Subscribe();
Run Code Online (Sandbox Code Playgroud)