Dav*_*vid 4 c# parallel-processing task-parallel-library
在我的应用我有三个班,Extractor,Transformer并且Loader,由第四类协调Coordinator.Extractor,Transformer并且Loader非常简单,并执行以下操作:
Extractor
公开名为Resultstype 的成员IEnumerable<string>,例如通过从文本文件中读取.提取应该是同步的.
Transformer
公开一个被调用的成员Transform,它接受一个字符串并通过某个过程将其转换为另一个字符串,这个过程预计很费时(在这里使用并行处理).
Loader
公开一个名为的成员Load,该成员接受一个字符串并将其加载到某个最终形式(例如另一个文本文件)中.加载应该是同步的.
这些Coordinator类协调三个操作.转换过程应该并行完成,然后将结果推送到加载程序读取的队列.Coordinator的Run()方法如下:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));
foreach(string output in outputs)
{
loader.Load(output);
}
Run Code Online (Sandbox Code Playgroud)
这样做很有效,除了必须在任何加载完成之前完成ALL转换 - 即Parallel.ForEach()在下一个foreach开始之前完成.我希望每个输出在准备就绪后立即传递给加载器.
我也试过这个:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
foreach (string input in extractor.Results)
{
string input1 = input;
Task task = Task.Factory.StartNew(
() => outputs.Enqueue(transformer.Transform(input1)));
}
foreach(string output in outputs)
{
loader.Load(output);
}
Run Code Online (Sandbox Code Playgroud)
但是foreach在将任何输出添加到队列之前,底部的循环会被击中,因此它就会退出.
一旦调用结果可用,我如何才能完成加载transformer.Transform()?
尝试BlockingCollection改为Parallel.Invoke.在下面的示例中,GetConsumingEnumerable(Producer-Consumer模式的Consumer部分)CompleteAdding在调用之前不会完成,因此load将一直运行直到fill完成.
var outputs = new BlockingCollection<string>();
// aka Producer
Action fill = () => {
Parallel.ForEach(extractor.Results, x => outputs.Add(transformer.Transform(x)));
outputs.CompleteAdding();
};
// aka Consumer
Action load = () => {
foreach(var o in outputs.GetConsumingEnumerable())
loader.Load(o);
}
Parallel.Invoke(fill, load);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1494 次 |
| 最近记录: |