Mar*_*ery 5 c# ienumerable ienumerator
我正在使用第三方库来迭代一些非常大的平面文件,这可能需要很长时间.该库提供了一个枚举器,因此您可以生成每个结果并对其进行处理,同时枚举器然后提取平面文件中的下一个项目.
例如:
IEnumerable<object> GetItems()
{
var cursor = new Cursor;
try
{
cursor.Open();
while (!cursor.EOF)
{
yield return new //object;
cursor.MoveNext();
}
}
finally
{
if (cursor.IsOpen)
{
cursor.Close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想要实现的是拥有两个相同Enumerable的消费者,所以我不必两次提取信息,因此每个消费者仍然可以处理每个项目,因为它到达时不必等待所有时间到达一旦.
IEnumerable<object> items = GetItems();
new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();
Run Code Online (Sandbox Code Playgroud)
我想我想要实现的是类似的东西
"如果消费者要求的项目已被提取,则产生它,否则接下来移动并等待"但我意识到两个线程之间可能存在MoveNext()冲突.
如果没有任何关于如何实现的想法,这样的事情是否已经存在?
谢谢
使用.NET 4 BlockingCollection<T>和TPL Tasks的管道模式实现是您正在寻找的.请参阅此StackOverflow帖子中的完整示例,以查看我的答案.
示例:3个同时消费者
BlockingCollection<string> queue = new BlockingCollection<string>();
public void Start()
{
var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());
Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}
private void ProducerImpl()
{
// 1. Read a raw data from a file
// 2. Preprocess it
// 3. Add item to a queue
queue.Add(item);
}
// ConsumerImpl must be thrad safe
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
foreach (var item in queue.GetConsumingEnumerable())
{
// TODO
}
}
Run Code Online (Sandbox Code Playgroud)
如果仍然不清楚,请告诉我.
管道流程的高级图:

本质上,您想要的是缓存 anIEnumerable<T>的数据,但不等待它完成后再存储它。你可以这样做:
public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
{
return new CacheEnumerator<T>(source);
}
private class CacheEnumerator<T> : IEnumerable<T>
{
private CacheEntry<T> cacheEntry;
public CacheEnumerator(IEnumerable<T> sequence)
{
cacheEntry = new CacheEntry<T>();
cacheEntry.Sequence = sequence.GetEnumerator();
cacheEntry.CachedValues = new List<T>();
}
public IEnumerator<T> GetEnumerator()
{
if (cacheEntry.FullyPopulated)
{
return cacheEntry.CachedValues.GetEnumerator();
}
else
{
return iterateSequence<T>(cacheEntry).GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
for (int i = 0; entry.ensureItemAt(i); i++)
{
yield return entry.CachedValues[i];
}
}
private class CacheEntry<T>
{
public bool FullyPopulated { get; private set; }
public IEnumerator<T> Sequence { get; set; }
//storing it as object, but the underlying objects will be lists of various generic types.
public List<T> CachedValues { get; set; }
private static object key = new object();
/// <summary>
/// Ensure that the cache has an item a the provided index. If not, take an item from the
/// input sequence and move to the cache.
///
/// The method is thread safe.
/// </summary>
/// <returns>True if the cache already had enough items or
/// an item was moved to the cache,
/// false if there were no more items in the sequence.</returns>
public bool ensureItemAt(int index)
{
//if the cache already has the items we don't need to lock to know we
//can get it
if (index < CachedValues.Count)
return true;
//if we're done there's no race conditions hwere either
if (FullyPopulated)
return false;
lock (key)
{
//re-check the early-exit conditions in case they changed while we were
//waiting on the lock.
//we already have the cached item
if (index < CachedValues.Count)
return true;
//we don't have the cached item and there are no uncached items
if (FullyPopulated)
return false;
//we actually need to get the next item from the sequence.
if (Sequence.MoveNext())
{
CachedValues.Add(Sequence.Current);
return true;
}
else
{
Sequence.Dispose();
FullyPopulated = true;
return false;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
用法示例:
private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
for (int i = 0; i < maxValue; i++)
{
Thread.Sleep(1000);
Console.WriteLine("actually generating value: {0}", i);
yield return i;
}
}
public static void Main(string[] args)
{
IEnumerable<int> sequence = interestingIntGenertionMethod(10)
.Cache();
int numThreads = 3;
for (int i = 0; i < numThreads; i++)
{
int taskID = i;
Task.Factory.StartNew(() =>
{
foreach (int value in sequence)
{
Console.WriteLine("Task: {0} Value:{1}",
taskID, value);
}
});
}
Console.WriteLine("Press any key to exit...");
Console.ReadKey(true);
}
Run Code Online (Sandbox Code Playgroud)