带有TimeSpan选择器的Observable.Generate似乎泄漏内存[使用TimeSpan> 15ms时]

Ben*_*rne 24 c# system.reactive

我正在研究使用Observable.Generate来创建一系列结果,这些结果是以msdn网站上的示例为起点间隔采样的.

以下代码没有TimeSpan选择器不会出现内存泄漏:

IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

但是,以下代码与TimeSpan选择器显示内存泄漏:

TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString(),
                                              timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏:

using System;
using System.Reactive.Linq;

namespace Sample
{
    public class Program
    {
        static void Main()
        {
            IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
            obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
            Console.ReadLine();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

内存泄漏是一个不断增长的集合:

System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable
Run Code Online (Sandbox Code Playgroud)

我错误地使用此API吗?我应该期望内存增长还是Reactive的错误?

Jam*_*rld 6

对我来说这看起来像是一个错误 - 或者至少在DefaultScheduler的"递归"调度实现中出现凌乱/不良行为(它不是真正的递归,我说的是调度程序本身传递给计划操作的重载,所以你可以安排继续).

您看到的一次性用品是通过调用DefaultScheduler.Schedule方法创建的(第71行:https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System .Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs).

有几个原因可以解释为什么其他尝试发现这种情况失败了.首先,一次性物品最终都布置-但只有当产生OnCompletes或者OnErrors,在这点上System.Reactive.AnonymousSafeObserver<T>,当您订阅它确实是干净的增长产生返回.

其次,如果您使用短路TimeSpan(请记住.NET Timer最小分辨率为15ms),那么Rx将优化使用定时器并QueueUserWorkItem在没有使用定时器的情况下调用,因此这些一次性用品不会被创建.

如果你深入了解Generate的实现(https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs)你可以看到它传递了IDisposable初始调用返回的Schedule,将它传递回观察者,观察者一直挂到它,直到错误/完成.这可以防止整个结果的递归调用链被收集 - 并且意味着如果你确实需要取消,或者当清理发生时,只有这样才能处理每个预定的操作的一次性调用.

您可以在下面直接使用DefaultScheduler的代码中看到相同的效果 - cancel在最后一行中的引用足以导致泄漏.确保使用发布版本,否则编译器将保持取消,直到方法结束为止.

// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;

TimeSpan timeSpan = TimeSpan.FromSeconds(1);

Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
    Console.WriteLine(state);

    if (state == maxCount)
    {
        mre.Set();
        return Disposable.Empty;
    }

    return self.Schedule(state + 1, timeSpan, recurse);
};

cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);

mre.WaitOne();

// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there's no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");
Run Code Online (Sandbox Code Playgroud)

我使用Jetbrains dotMemory API来获取内存转储以得出结论 - 我已经删除了那些API调用的代码,但如果你有这个产品,那么这里有一个完整的要点,你将能够看到最后一行清楚地说明了最后一行:https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c或者,您可以使用MS Profiler API - 我目前没有将其分页到我的大脑工作集中!

  • "我没有把我的大脑的工作集分页"我将从现在开始使用这句话.随着岁月的流逝,可能更频繁! (2认同)