bas*_*ick 9 .net c# scheduler system.reactive
我正在编写一个C#(.NET 4.5)应用程序,用于聚合基于时间的事件以进行报告.为了使我的查询逻辑可以重复用于实时和历史数据,我使用了Reactive Extensions(2.0)及其IScheduler基础结构(HistoricalScheduler和朋友).
例如,假设我们创建的事件列表(按时间顺序排序,但他们可能一致!),其唯一的有效载荷IST他们的时间戳,想知道他们的整个固定期限的缓冲区分配:
const int num = 100000;
const int dist = 10;
var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);
for (int i = 0; i < num; i++)
{
events.Add(curr);
curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}
var stream = Observable.Generate<int, DateTimeOffset>(
0,
s => s < events.Count,
s => s + 1,
s => events[s],
s => events[s],
time);
stream.Buffer(TimeSpan.FromMilliseconds(num), time)
.Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));
time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));
Run Code Online (Sandbox Code Playgroud)
运行在这个代码的结果System.StackOverflowException与下面的堆栈跟踪(it's最后3线一路向下):
mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
Run Code Online (Sandbox Code Playgroud)
好吧,这个问题似乎来自我使用的Observable.Generate(),这取决于列表的大小(num)且不论调度的选择.
我究竟做错了什么?或者更一般地,这将是创建一个首选的方式IObservable从IEnumerable自身提供时间戳的事件?
(更新 - 意识到我没有提供替代方案:请参阅答案底部)
\n\n问题在于如何Observable.Generate工作 - 它用于根据参数展开核心递归(认为递归翻转)生成器;如果这些参数最终生成一个非常嵌套的核心递归生成器,那么你的堆栈就会崩溃。
从这一点开始,我猜测了很多(我面前没有 Rx 源)(见下文),但我愿意打赌你的定义最终会扩展到类似以下内容:
initial_state =>\ngenerate_next(initial_state) => \ngenerate_next(generate_next(initial_state)) => \ngenerate_next(generate_next(generate_next(initial_state))) =>\ngenerate_next(generate_next(generate_next(generate_next(initial_state)))) => ...\nRun Code Online (Sandbox Code Playgroud)\n\n如此往复,直到您的调用堆栈变得足够大而溢出。比如说,一个方法签名 + 你的 int 计数器,每个递归调用大约需要 8-16 个字节(更多取决于状态机生成器的实现方式),因此 60,000 听起来不错(1M / 16 ~ 62500最大深度)
\n\n编辑:拉出源代码 - 确认:生成的“运行”方法看起来像这样 - 请注意对以下内容的嵌套调用Generate:
protected override IDisposable Run(\n IObserver<TResult> observer, \n IDisposable cancel, \n Action<IDisposable> setSink)\n{\n if (this._timeSelectorA != null)\n {\n Generate<TState, TResult>.\xce\xb1 \xce\xb1 = \n new Generate<TState, TResult>.\xce\xb1(\n (Generate<TState, TResult>) this, \n observer, \n cancel);\n setSink(\xce\xb1);\n return \xce\xb1.Run();\n }\n if (this._timeSelectorR != null)\n {\n Generate<TState, TResult>.\xce\xb4 \xce\xb4 = \n new Generate<TState, TResult>.\xce\xb4(\n (Generate<TState, TResult>) this, \n observer, \n cancel);\n setSink(\xce\xb4);\n return \xce\xb4.Run();\n }\n Generate<TState, TResult>._ _ = \n new Generate<TState, TResult>._(\n (Generate<TState, TResult>) this, \n observer, \n cancel);\n setSink(_);\n return _.Run();\n}\nRun Code Online (Sandbox Code Playgroud)\n\n编辑:Derp,没有提供任何替代方案...这是一个可能有效的方案:
\n\n(编辑:固定Enumerable.Range,因此流大小不会\xc2\xb4t 乘以chunkSize)
const int num = 160000;\nconst int dist = 10;\n\nvar events = new List<DateTimeOffset>();\nvar curr = DateTimeOffset.Now;\nvar gap = new Random();\nvar time = new HistoricalScheduler(curr);\n\nfor (int i = 0; i < num; i++)\n{\n events.Add(curr);\n curr += TimeSpan.FromMilliseconds(gap.Next(dist));\n}\n\n // Size too big? Fine, we\'ll chunk it up!\nconst int chunkSize = 10000;\nvar numberOfChunks = events.Count / chunkSize;\n\n // Generate a whole mess of streams based on start/end indices\nvar streams = \n from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)\n let startIdx = chunkIndex * chunkSize\n let endIdx = Math.Min(events.Count, startIdx + chunkSize)\n select Observable.Generate<int, DateTimeOffset>(\n startIdx,\n s => s < endIdx,\n s => s + 1,\n s => events[s],\n s => events[s],\n time);\n\n // E pluribus streamum\nvar stream = Observable.Concat(streams);\n\nstream.Buffer(TimeSpan.FromMilliseconds(num), time)\n .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));\n\ntime.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
756 次 |
| 最近记录: |