一个枚举器包装器,预先缓冲来自底层枚举器的许多项目

Eug*_*sky 5 .net c# enumerator backpressure

假设我有一些在方法IEnumerator<T>内部进行大量处理的方法MoveNext()

从该枚举器消耗的代码不仅消耗与可用数据一样快的速度,而且偶尔会等待(其细节与我的问题无关)以便同步需要恢复消耗的时间。但是当它下一次调用 时MoveNext(),它需要尽快获得数据。

一种方法是将整个流预先消耗到某个列表或数组结构中以进行即时枚举。然而,这会浪费内存,因为在任何单个时间点,只有一项正在使用,并且在整个数据无法放入内存的情况下,这将是禁止的。

那么,.net 中是否有一些通用的东西可以以某种方式包装枚举器/枚举,以便它提前异步地预迭代底层枚举器几个项目并缓冲结果,以便它的缓冲区中始终有许多可用的项目,并且调用 MoveNext 永远不必等待?显然,消耗的项目(即由调用者的后续 MoveNext 迭代的项目)将从缓冲区中删除。

注意,我正在尝试做的部分也称为BackPressure,并且在 Rx 世界中,已经在RxJava中实现,并且正在Rx.NET中进行讨论。Rx(推送数据的可观察量)可以被认为是枚举器的相反方法(枚举器允许拉取数据)。在拉动方法中,背压相对容易,正如我的回答所示:只需暂停消耗即可。推动的时候比较困难,需要额外的反馈机制。

Asa*_*din 5

自定义可枚举类的更简洁替代方法是执行以下操作:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
{
    var queue = new BlockingCollection<T>(bufferSize);

    Task.Run(() => {
        foreach(var i in source) queue.Add(i);
        queue.CompleteAdding();
    });

    return queue.GetConsumingEnumerable();
}
Run Code Online (Sandbox Code Playgroud)

这可以用作:

var slowEnumerable = GetMySlowEnumerable();
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread
Run Code Online (Sandbox Code Playgroud)

  • 好的解决方案。我知道并已经使用了 BlockingCollection (并停止在需要快速的高度并发代码中使用它,因为 [与 ConcurrentQueue+AutoResetEvent 相比它很慢](http://stackoverflow.com/a/29269149/709537)),但我不知道“GetConsumingEnumerable”,在这个简单的生产者/消费者场景中,它看起来像是完美的解决方案。所以我毕竟不必自己动手。 (2认同)