Eoi*_*ell 5 c# queue message-queue task-parallel-library c#-4.0
我不确定以下是否可行,但我想以一种受限制的方式在Paralell中调用一些Actions,但是要保持处理流程的连续性,而不是恢复使用定时器或循环/睡眠周期.
到目前为止,我已经开始工作了它从一些来源加载大量输入......然后以受控方式并行处理它们并像下面一样循环.
static void Main(string[] args)
{
while(true) //Simulate a Timer Elapsing...
{
IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
//Simulate querying database queue tables for next batch of entries
RunAllActions(inputs, 3); //Max 3 at a time.
}
}
static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};
Parallel.ForEach<int>(inputs, options, DoWork);
//Blocks here until all inputs are processed.
Console.WriteLine("Batch of Work Done!!!");
}
static void DoWork(int input)
{
Console.WriteLine("Starting Task {0}", input);
System.Threading.Thread.Sleep(3000);
Console.WriteLine("Finishing Task {0}", input);
}
Run Code Online (Sandbox Code Playgroud)
我想知道的是,TPL中是否有一个构造可以用来保持它始终运行...这样我就可以用MessageQueue接收事件替换"Timer Elapsing"和"Database Polling".
以下是我想要实现的粗略版本......除此之外我还可以解决它,但我想知道这种模式是否构建在TPL中.
internal class Engine
{
private MessageQueue mq;
private Queue<int> myInternalApplicationQueue;
public Engine()
{
//Message Queue to get new task inputs from
mq = new MessageQueue();
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
// internal Queue to put them in.
myInternalApplicationQueue = new Queue<int>();
}
void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
//On MQ Receive, pop the input in a queue in my app
int input = (int) e.Message.Body;
myInternalApplicationQueue.Enqueue(input);
}
public void StartWorking()
{
//Once this gets called, it doesn't stop... it just keeps processing/watching that queue
//processing the tasks as fast as it's allowed while the app is running.
var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
// ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
}
}
Run Code Online (Sandbox Code Playgroud)
您可以使用它BlockingCollection<T>来处理这种类型的操作,这实际上是生产者/消费者方案.
基本上,你设置BlockingCollection<T>并使用它作为你的"制作人".然后,您将有三个(或任意数量)的消费者任务(通常设置为长时间运行的任务)处理元素(通过调用blockingCollection.GetConsumingEnumerable()标准的foreach循环).
然后,根据需要将项目添加到集合中,并且将不断处理它们.完成后,你会调用BlockingCollection<T>.CompleteAdding,这将导致foreach循环完成,整个过程停止.
作为一个侧面说明-你通常不希望使用Parallel.ForEach在GetConsumingEnumerable()从BlockingCollection<T>-至少不会,除非你自己处理分区.通常最好使用多个任务并按顺序迭代.原因是默认分区方案Parallel.ForEach将导致问题(它等待数据的"块"可用,而不是立即处理项目,并且"块"随着时间的推移变得越来越大).