为什么Observable.Generate()抛出System.StackOverflowException?

bas*_*ick 9 .net c# scheduler system.reactive

我正在编写一个C#(.NET 4.5)应用程序,用于聚合基于时间的事件以进行报告.为了使我的查询逻辑可以重复用于实时和历史数据,我使用了Reactive Extensions(2.0)及其IScheduler基础结构(HistoricalScheduler和朋友).

例如,假设我们创建的事件列表(按时间顺序排序,但他们可能一致!),其唯一的有效载荷IST他们的时间戳,想知道他们的整个固定期限的缓冲区分配:

const int num = 100000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();

var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

var stream = Observable.Generate<int, DateTimeOffset>(
    0,
    s => s < events.Count,
    s => s + 1,
    s => events[s],
    s => events[s],
    time);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));
Run Code Online (Sandbox Code Playgroud)

运行在这个代码的结果System.StackOverflowException与下面的堆栈跟踪(it's最后3线一路向下):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...
Run Code Online (Sandbox Code Playgroud)

好吧,这个问题似乎来自我使用的Observable.Generate(),这取决于列表的大小(num)且不论调度的选择.

我究竟做错了什么?或者更一般地,这将是创建一个首选的方式IObservableIEnumerable自身提供时间戳的事件?

Jer*_*all 4

(更新 - 意识到我没有提供替代方案:请参阅答案底部)

\n\n

问题在于如何Observable.Generate工作 - 它用于根据参数展开核心递归(认为递归翻转)生成器;如果这些参数最终生成一个非常嵌套的核心递归生成器,那么你的堆栈就会崩溃。

\n\n

从这一点开始,我猜测了很多(我面前没有 Rx 源)(见下文),但我愿意打赌你的定义最终会扩展到类似以下内容:

\n\n
initial_state =>\ngenerate_next(initial_state) => \ngenerate_next(generate_next(initial_state)) => \ngenerate_next(generate_next(generate_next(initial_state))) =>\ngenerate_next(generate_next(generate_next(generate_next(initial_state)))) => ...\n
Run Code Online (Sandbox Code Playgroud)\n\n

如此往复,直到您的调用堆栈变得足够大而溢出。比如说,一个方法签名 + 你的 int 计数器,每个递归调用大约需要 8-16 个字节(更多取决于状态机生成器的实现方式),因此 60,000 听起来不错(1M / 16 ~ 62500最大深度)

\n\n

编辑:拉出源代码 - 确认:生成的“运行”方法看起来像这样 - 请注意对以下内容的嵌套调用Generate

\n\n
protected override IDisposable Run(\n    IObserver<TResult> observer, \n    IDisposable cancel, \n    Action<IDisposable> setSink)\n{\n    if (this._timeSelectorA != null)\n    {\n        Generate<TState, TResult>.\xce\xb1 \xce\xb1 = \n                new Generate<TState, TResult>.\xce\xb1(\n                     (Generate<TState, TResult>) this, \n                     observer, \n                     cancel);\n        setSink(\xce\xb1);\n        return \xce\xb1.Run();\n    }\n    if (this._timeSelectorR != null)\n    {\n        Generate<TState, TResult>.\xce\xb4 \xce\xb4 = \n               new Generate<TState, TResult>.\xce\xb4(\n                   (Generate<TState, TResult>) this, \n                   observer, \n                   cancel);\n        setSink(\xce\xb4);\n        return \xce\xb4.Run();\n    }\n    Generate<TState, TResult>._ _ = \n             new Generate<TState, TResult>._(\n                  (Generate<TState, TResult>) this, \n                  observer, \n                  cancel);\n    setSink(_);\n    return _.Run();\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

编辑:Derp,没有提供任何替代方案...这是一个可能有效的方案:

\n\n

(编辑:固定Enumerable.Range,因此流大小不会\xc2\xb4t 乘以chunkSize

\n\n
const int num = 160000;\nconst int dist = 10;\n\nvar events = new List<DateTimeOffset>();\nvar curr = DateTimeOffset.Now;\nvar gap = new Random();\nvar time = new HistoricalScheduler(curr);\n\nfor (int i = 0; i < num; i++)\n{\n    events.Add(curr);\n    curr += TimeSpan.FromMilliseconds(gap.Next(dist));\n}\n\n    // Size too big? Fine, we\'ll chunk it up!\nconst int chunkSize = 10000;\nvar numberOfChunks = events.Count / chunkSize;\n\n    // Generate a whole mess of streams based on start/end indices\nvar streams = \n    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)\n    let startIdx = chunkIndex * chunkSize\n    let endIdx = Math.Min(events.Count, startIdx + chunkSize)\n    select Observable.Generate<int, DateTimeOffset>(\n        startIdx,\n        s => s < endIdx,\n        s => s + 1,\n        s => events[s],\n        s => events[s],\n        time);\n\n    // E pluribus streamum\nvar stream = Observable.Concat(streams);\n\nstream.Buffer(TimeSpan.FromMilliseconds(num), time)\n    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));\n\ntime.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));\n
Run Code Online (Sandbox Code Playgroud)\n