为什么这个Observable.Generate重载会导致内存泄漏?[使用Timespan <15ms]

voq*_*oqk 12 c# system.reactive

在我的机器上大约10秒后,以下Rx.NET代码将消耗大约500 MB的内存.

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0, 
                  j => true, 
                  j => j + 1, 
                  j => new { N = j },
                  j => TimeSpan.FromMilliseconds(1)));

stream.Subscribe();
Run Code Online (Sandbox Code Playgroud)

如果我使用Observable.Generate没有Func<int, TimeSpan>参数的重载我的内存使用量为35 MB.

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0,
                  j => true,
                  j => j + 1,
                  j => new { N = j }));
                  // j => TimeSpan.FromMilliseconds(1))); ** Removed! **

stream.Subscribe();
Run Code Online (Sandbox Code Playgroud)

使用SelectMany()或Merge()扩展方法时似乎只是一个问题.

Eni*_*ity 7

这是使用默认调度程序的问题.

使用TimeSpan调度程序的版本DefaultScheduler.没有TimeSpanCurrentThreadScheduler.

因此,对于基于时间的生成,它非常快速地尝试调度所有操作,并且基本上构建了等待执行的大量事件队列.因此它使用了大量内存.

使用非基于时间的生成它使用当前线程,因此它将生成并消耗每个生成的值,因此使用非常少的内存.

哦,这不是内存泄漏.如果您尝试以比可以消耗的速度更快的速度安排无限数量的值,那么这只是正常操作.


我反编译代码以确定使用了哪些调度程序.

这是非基于时间的反编译:

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    if (condition == null)
        throw new ArgumentNullException("condition");
    if (iterate == null)
        throw new ArgumentNullException("iterate");
    if (resultSelector == null)
        throw new ArgumentNullException("resultSelector");
    return Observable.s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector);
}

public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    return (IObservable<TResult>)new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
}

internal static IScheduler Iteration
{
    get
    {
        return (IScheduler)CurrentThreadScheduler.Instance;
    }
}
Run Code Online (Sandbox Code Playgroud)

上述方法是从Observable,QueryLanguageSchedulerDefaults分别.