Hot observable 和 IDisposable

Dmi*_*try 1 c# system.reactive rx.net

我想找到关于热可观察和 IDisposable 对象作为事件类型的最佳实践。

假设我的代码将 Bitmap 对象生成为热可观察对象,并且我有多个订阅者。例如:

    public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler)
    {
        return Directory.GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .Select(x => new Bitmap(x))
            .Publish()
            .RefCount();
    }

public void Main()
{
    var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
    var process1 = images.Subscribe(SaveBwImages);
    var process2 = images.Subscribe(SaveScaledImages);
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
}
Run Code Online (Sandbox Code Playgroud)

所以问题是:处理作为热 observable 来源的一次性资源的最佳实践是什么?

在这个例子中,我想在使用后处理图像,但我不知道 - 究竟是什么时候?

订阅事件的调用顺序并不明显,因此我无法处理“最后一个”事件。

提前致谢。

Eni*_*ity 5

你的 observable 不热。这是一个共享源的冷 observable,它只会让后续的观察者表现得好像他们得到了一个热的 observable。它可能最好被描述为一个温暖的可观察对象。

让我们看一个例子:

var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount();

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });

Thread.Sleep(10000);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });

Observable
    .Range(0, 3)
    .ObserveOn(Scheduler.Default)
    .Publish()
    .RefCount()
    .Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });
Run Code Online (Sandbox Code Playgroud)

当我运行这个时,我得到:

一种
一种
乙
C
一种
乙
C
乙
乙
乙

“B”和“C”观察者错过了序列的第一个值。

并且,在“A”、“B”和“C”观察者完成之后,序列就完成了,所以“D”永远不会得到值。我不得不创建一个全新的 observable 来显示值“E”。

所以,在你的代码中你有一个问题,如果第一个观察者在第二个和第三个订阅之前完成一个或多个值,那么这些观察者会错过值。那是你要的吗?

不过,您的问题是关于如何处理从 observable 返回的一次性值。如果您使用Observable.Using.

这是与您的代码类似的情况:

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
    return
        Observable
            .Range(0, 3)
            .ObserveOn(Scheduler.Default)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => Disposable.Create(() => Console.WriteLine("Disposed!")),
                        y => Observable.Return(y)))
        .Publish()
        .RefCount();
}
Run Code Online (Sandbox Code Playgroud)

现在,如果我运行此代码:

var query = ImagesInFolder(Scheduler.Default);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });

Thread.Sleep(10000);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

一种
乙
C
处置!
一种
乙
C
处置!
一种
乙
C
处置!

同样,“D”永远不会产生任何值——“B”和“C”可能会遗漏值,但这确实显示了如何返回一个可观察值,该值自动被观察者处理/已完成。

您的代码如下所示:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
    return
        Directory
            .GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => new System.Drawing.Bitmap(x),
                        bm => Observable.Return(bm)))
        .Publish()
        .RefCount();
}
Run Code Online (Sandbox Code Playgroud)

但是,您仍然处于可能缺失值的领域。

因此你需要真正做到这一点:

public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
    return
        Directory
            .GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => new System.Drawing.Bitmap(x),
                        bm => Observable.Return(bm)))
            .Publish();
}
Run Code Online (Sandbox Code Playgroud)

然后你这样称呼它:

public void Main()
{
    var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
    var process1 = images.Subscribe(SaveBwImages);
    var process2 = images.Subscribe(SaveScaledImages);
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
    images.Connect();
}
Run Code Online (Sandbox Code Playgroud)

另一种选择是删除整个.Publish().RefCount()代码,并确保在订阅时自己正确执行。

试试这个代码:

void Main()
{
    ImagesInFolder(Scheduler.Default)
        .Publish(iif =>
            Observable
                .Merge(
                    iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
                    iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
                    iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
        .Subscribe();
}

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
    return
        Observable
            .Range(0, 3)
            .ObserveOn(Scheduler.Default)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => Disposable.Create(() => Console.WriteLine("Disposed!")),
                        y => Observable.Return(y)));
}
Run Code Online (Sandbox Code Playgroud)

我得到这个:

一种
乙
C
处置!
一种
乙
C
处置!
一种
乙
C
处置!

再次,Disposed!每个观察者运行后一个,但现在的问题是我改变了每个观察者的处理延迟,但代码仍然输出观察者添加的顺序。问题是 Rx 按顺序运行每个观察者,并且生成的每个值都是按顺序进行的。

我希望您认为您可以使用.Publish(). 你没有。

让它并行运行的方法是.Publish()完全删除。

只需做这样的事情:

void Main()
{
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); });
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); });
}

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
    return
        Observable
            .Range(0, 3)
            .ObserveOn(Scheduler.Default)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => Disposable.Create(() => Console.WriteLine("Disposed!")),
                        y => Observable.Return(y)));
}
Run Code Online (Sandbox Code Playgroud)

我现在明白了:

一种
处置!
C
处置!
一种
处置!
乙
处置!
一种
处置!
C
处置!
C
处置!
乙
处置!
乙
处置!

代码现在并行运行并尽快完成 - 并IDisposable在订阅完成时正确处理。你只是没有享受与每个观察者共享单一一次性资源的乐趣,但你也没有得到所有的行为问题。