如何在ConcurrentQueue或ConcurrentStack中使用IObservable/IObserver

Jam*_*ack 5 plinq concurrent-programming system.reactive c#-4.0

我意识到当我尝试使用多个线程处理并发队列中的项目而多个线程可以将项目放入其中时,理想的解决方案是将Reactive Extensions与Concurrent数据结构一起使用.

我原来的问题是:

使用ConcurrentQueue时,尝试在并行循环时出列

所以我很好奇是否有任何方法可以让LINQ(或PLINQ)查询在项目被放入其中时不断出列.

我试图让这个工作的方式,我可以有n个生产者进入队列和有限数量的线程进行处理,所以我不会重载数据库.

如果我可以使用Rx框架,那么我希望我可以启动它,如果在100ms内放置了100个项目,那么作为PLINQ查询一部分的20个线程将只处理整个队列.

我正在努力合作的技术有三种:

  1. Rx框架(Reactive LINQ)
  2. PLING
  3. System.Collections.Concurrent结构

Lee*_*ell 6

Drew是对的,我认为ConcurrentQueue虽然听起来很完美,但实际上是BlockingCollection使用的底层数据结构.似乎也很重要.查看本书的第7章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 它将会解释如何使用BlockingCollection并让多个生产者和多个消费者各自取消"队列".您将需要查看"GetConsumingEnumerable()"方法,并可能只调用.ToObservable().

*本书的其余部分非常平均.

编辑:

这是一个示例程序,我认为你想做什么?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
Run Code Online (Sandbox Code Playgroud)


Dre*_*rsh 3

我不知道如何最好地使用 Rx 来完成此任务,但我建议仅使用BlockingCollection<T>生产者-消费者模式。您的主线程将项目添加到集合中,ConcurrentQueue<T>默认情况下使用底层。Task然后,您可以在它之前启动一个单独的组件,该组件可以Parallel::ForEach同时BlockingCollection<T>处理集合中对系统有意义的尽可能多的项目。现在,您可能还想研究使用GetConsumingPartitionerParallelExtensions 库的方法,以便最有效,因为在这种情况下,默认分区程序将产生比您想要的更多的开销。您可以从这篇博文中了解更多相关信息。

当主线程完成后,您可以调用CompleteAddingBlockingCollection<T>启动Task::WaitTask等待所有使用者完成对集合中所有项目的处理。