队列到ConcurrentQueue

web*_*ad3 2 c# queue parallel-processing backgroundworker

我在C#(4.0)中有一个常规的Queue对象,我正在使用访问此Queue的BackgroundWorkers.

我使用的代码如下:

   do
    {
        while (dataQueue.Peek() == null // nothing waiting yet 
            && isBeingLoaded == true // and worker 1 still actively adding stuff
        )
            System.Threading.Thread.Sleep(100);

        // otherwise ready to do something: 
        if (dataQueue.Peek() != null) // because maybe the queue is complete and also empty 
        {
            string companyId = dataQueue.Dequeue();
            processLists(companyId);
            // use up the stuff here //
        } // otherwise nothing was there yet, it will resolve on the next loop.
    } while (isBeingLoaded == true // still have stuff coming at us 
           || dataQueue.Peek() != null);   // still have stuff we haven’t done
Run Code Online (Sandbox Code Playgroud)

但是,我想在处理线程时我应该使用a ConcurrentQueue.我想知道是否有如何ConcurrentQueue在上面的Do While循环中使用a的例子?

我尝试使用TryPeek的一切都无法正常工作..

有任何想法吗?

use*_*116 5

您可以将a BlockingCollection<T>用作生产者 - 消费者队列.

我的回答对你的架构做了一些假设,但你可以按照你认为合适的方式塑造它:

public void Producer(BlockingCollection<string> ids)
{
    // assuming this.CompanyRepository exists
    foreach (var id in this.CompanyRepository.GetIds())
    {
        ids.Add(id);
    }

    ids.CompleteAdding(); // nothing left for our workers
}

public void Consumer(BlockingCollection<string> ids)
{
    while (true)
    {
       string id = null;
       try
       {
           id = ids.Take();
       } catch (InvalidOperationException) {
       }

       if (id == null) break;

       processLists(id);
    }
}
Run Code Online (Sandbox Code Playgroud)

您可以根据需要调整尽可能多的消费者:

var companyIds = new BlockingCollection<string>();
Producer(companyIds);

Action process = () => Consumer(companyIds);

// 2 workers
Parallel.Invoke(process, process);
Run Code Online (Sandbox Code Playgroud)

  • 我想这就是OP需要的东西.他可能*想要*不同的东西,但这就是他所需要的. (2认同)