Ben*_*rne 24 c# system.reactive
我正在研究使用Observable.Generate来创建一系列结果,这些结果是以msdn网站上的示例为起点间隔采样的.
以下代码没有TimeSpan选择器不会出现内存泄漏:
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
但是,以下代码与TimeSpan选择器显示内存泄漏:
TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString(),
timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏:
using System;
using System.Reactive.Linq;
namespace Sample
{
public class Program
{
static void Main()
{
IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
Console.ReadLine();
}
}
}
Run Code Online (Sandbox Code Playgroud)
内存泄漏是一个不断增长的集合:
System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable
Run Code Online (Sandbox Code Playgroud)
我错误地使用此API吗?我应该期望内存增长还是Reactive的错误?
对我来说这看起来像是一个错误 - 或者至少在DefaultScheduler的"递归"调度实现中出现凌乱/不良行为(它不是真正的递归,我说的是调度程序本身传递给计划操作的重载,所以你可以安排继续).
您看到的一次性用品是通过调用DefaultScheduler.Schedule方法创建的(第71行:https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System .Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs).
有几个原因可以解释为什么其他尝试发现这种情况失败了.首先,一次性物品最终都布置-但只有当产生OnCompletes或者OnErrors,在这点上System.Reactive.AnonymousSafeObserver<T>,当您订阅它确实是干净的增长产生返回.
其次,如果您使用短路TimeSpan(请记住.NET Timer最小分辨率为15ms),那么Rx将优化使用定时器并QueueUserWorkItem在没有使用定时器的情况下调用,因此这些一次性用品不会被创建.
如果你深入了解Generate的实现(https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs)你可以看到它传递了IDisposable初始调用返回的Schedule,将它传递回观察者,观察者一直挂到它,直到错误/完成.这可以防止整个结果的递归调用链被收集 - 并且意味着如果你确实需要取消,或者当清理发生时,只有这样才能处理每个预定的操作的一次性调用.
您可以在下面直接使用DefaultScheduler的代码中看到相同的效果 - cancel在最后一行中的引用足以导致泄漏.确保使用发布版本,否则编译器将保持取消,直到方法结束为止.
// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;
TimeSpan timeSpan = TimeSpan.FromSeconds(1);
Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
Console.WriteLine(state);
if (state == maxCount)
{
mre.Set();
return Disposable.Empty;
}
return self.Schedule(state + 1, timeSpan, recurse);
};
cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);
mre.WaitOne();
// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there's no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");
Run Code Online (Sandbox Code Playgroud)
我使用Jetbrains dotMemory API来获取内存转储以得出结论 - 我已经删除了那些API调用的代码,但如果你有这个产品,那么这里有一个完整的要点,你将能够看到最后一行清楚地说明了最后一行:https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c或者,您可以使用MS Profiler API - 我目前没有将其分页到我的大脑工作集中!