Mic*_*tov 7 c# task-parallel-library tpl-dataflow
我需要做这样的工作:
由于我需要控制并行处理的页数,我决定使用TPL数据流:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
Run Code Online (Sandbox Code Playgroud)
现在我需要"过程图像"来并行处理图像,但我想限制我在当前工作中所有并行页面上处理的图像数量.
我可以将TrasnformManyBlock用于"过程图像",但如何在"保存页面"块中将它们收回?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load images |
| TransformManyBlock<Page, Image[]> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|___________________________________|
/ | \
______________________________________________
_|____________________________________________ |
| Process image | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1 | |
| MaxDegreeOfParallelism = 8 |_|
|______________________________________________|
\ | /
How to group images by page ?
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
Run Code Online (Sandbox Code Playgroud)
最重要的是,其中一个图像可能无法继续,我不想保存包含失败图像的页面.
您可以通过在给定页面的图像到达时进行记录然后在所有图像到达时发送页面来将图像组合在一起。要弄清楚这一点,页面需要知道它包含多少图像,但我假设您知道这一点。
在代码中,它可能如下所示:
public static IPropagatorBlock<TSplit, TMerged>
CreaterMergerBlock<TSplit, TMerged>(
Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
var dictionary = new Dictionary<TMerged, int>();
return new TransformManyBlock<TSplit, TMerged>(
split =>
{
var merged = getMergedFunc(split);
int count;
dictionary.TryGetValue(merged, out count);
count++;
if (getSplitCount(merged) == count)
{
dictionary.Remove(merged);
return new[] { merged };
}
dictionary[merged] = count;
return new TMerged[0];
});
}
Run Code Online (Sandbox Code Playgroud)
用法:
var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
page => page.LoadImages(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
image =>
{
// process the image here
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
(ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
page => /* save the page here */,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1357 次 |
| 最近记录: |