使用yield访问IEnumerable的多个线程

Mar*_*ery 5 c# ienumerable ienumerator

我正在使用第三方库来迭代一些非常大的平面文件,这可能需要很长时间.该库提供了一个枚举器,因此您可以生成每个结果并对其进行处理,同时枚举器然后提取平面文件中的下一个项目.

例如:

IEnumerable<object> GetItems()
{
    var cursor = new Cursor;

    try
    {
        cursor.Open();

        while (!cursor.EOF)
        {
            yield return new //object;

            cursor.MoveNext();
        }

    }
    finally
    {
        if (cursor.IsOpen)
        {
            cursor.Close();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我想要实现的是拥有两个相同Enumerable的消费者,所以我不必两次提取信息,因此每个消费者仍然可以处理每个项目,因为它到达时不必等待所有时间到达一旦.

IEnumerable<object> items = GetItems();

new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();
Run Code Online (Sandbox Code Playgroud)

我想我想要实现的是类似的东西

"如果消费者要求的项目已被提取,则产生它,否则接下来移动并等待"但我意识到两个线程之间可能存在MoveNext()冲突.

如果没有任何关于如何实现的想法,这样的事情是否已经存在?

谢谢

sll*_*sll 5

使用.NET 4 BlockingCollection<T>和TPL Tasks的管道模式实现是您正在寻找的.请参阅此StackOverflow帖子中的完整示例,以查看我的答案.

示例:3个同时消费者

BlockingCollection<string> queue = new BlockingCollection<string>();    
public void Start()
{
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}

private void ProducerImpl()
{
   // 1. Read a raw data from a file
   // 2. Preprocess it
   // 3. Add item to a queue
   queue.Add(item);
}

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        // TODO
    }
}
Run Code Online (Sandbox Code Playgroud)

如果仍然不清楚,请告诉我.

管道流程的高级图:

在此输入图像描述

  • 这里的问题是,每个商品仅由三个消费者之一处理。OP的示例是,有一个生产者,然后是两个消费者,每个消费者处理每个项目,但并行进行消费,而不是一个先于另一个处理项目的管道。这就是为什么您的解决方案不适用的原因。 (2认同)

Ser*_*rvy 3

本质上,您想要的是缓存 anIEnumerable<T>的数据,但不等待它完成后再存储它。你可以这样做:

public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
{
    return new CacheEnumerator<T>(source);
}

private class CacheEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public CacheEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>();
        cacheEntry.Sequence = sequence.GetEnumerator();
        cacheEntry.CachedValues = new List<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    for (int i = 0; entry.ensureItemAt(i); i++)
    {
        yield return entry.CachedValues[i];
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public IEnumerator<T> Sequence { get; set; }

    //storing it as object, but the underlying objects will be lists of various generic types.
    public List<T> CachedValues { get; set; }

    private static object key = new object();
    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (Sequence.MoveNext())
            {
                CachedValues.Add(Sequence.Current);
                return true;
            }
            else
            {
                Sequence.Dispose();
                FullyPopulated = true;
                return false;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .Cache();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}
Run Code Online (Sandbox Code Playgroud)