如何使用C#4中的TPL创建一个恒定的Processing"Flow"

Eoi*_*ell 5 c# queue message-queue task-parallel-library c#-4.0

我不确定以下是否可行,但我想以一种受限制的方式在Paralell中调用一些Actions,但是要保持处理流程的连续性,而不是恢复使用定时器或循环/睡眠周期.

到目前为止,我已经开始工作了它从一些来源加载大量输入......然后以受控方式并行处理它们并像下面一样循环.

static void Main(string[] args)
{
    while(true) //Simulate a Timer Elapsing...
    {
        IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
        //Simulate querying database queue tables for next batch of entries

        RunAllActions(inputs, 3); //Max 3 at a time.
    }
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

    Parallel.ForEach<int>(inputs, options, DoWork);
    //Blocks here until all inputs are processed.
    Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
    Console.WriteLine("Starting Task {0}", input);
    System.Threading.Thread.Sleep(3000);
    Console.WriteLine("Finishing Task {0}", input);
}
Run Code Online (Sandbox Code Playgroud)

我想知道的是,TPL中是否有一个构造可以用来保持它始终运行...这样我就可以用MessageQueue接收事件替换"Timer Elapsing"和"Database Polling".

以下是我想要实现的粗略版本......除此之外我还可以解决它,但我想知道这种模式是否构建在TPL中.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it's allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}
Run Code Online (Sandbox Code Playgroud)

Ree*_*sey 6

您可以使用它BlockingCollection<T>来处理这种类型的操作,这实际上是生产者/消费者方案.

基本上,你设置BlockingCollection<T>并使用它作为你的"制作人".然后,您将有三个(或任意数量)的消费者任务(通常设置为长时间运行的任务)处理元素(通过调用blockingCollection.GetConsumingEnumerable()标准的foreach循环).

然后,根据需要将项目添加到集合中,并且将不断处理它们.完成后,你会调用BlockingCollection<T>.CompleteAdding,这将导致foreach循环完成,整个过程停止.

作为一个侧面说明-你通常不希望使用Parallel.ForEachGetConsumingEnumerable()BlockingCollection<T>-至少不会,除非你自己处理分区.通常最好使用多个任务并按顺序迭代.原因是默认分区方案Parallel.ForEach将导致问题(它等待数据的"块"可用,而不是立即处理项目,并且"块"随着时间的推移变得越来越大).