Pou*_*sen 5 .net c# task-parallel-library tpl-dataflow
鉴于 TPL 数据流中的以下设置。
var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");
var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);
var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
return directory.GetDirectories();
});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);
var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
(z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
() => new TransformBlock<string, string>((s) =>
{
Trace.TraceInformation("Combining {0}", s);
return s;
}));
tileFilder.LinkTo(block);
using (new TraceTimer("Time"))
{
dirBroadcast.Post(directory);
block.LinkTo(new ActionBlock<FileInfo>((s) =>
{
Trace.TraceInformation("Done combining : {0}", s.Name);
}));
block.Complete();
block.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)
我想知道如何将其标记为完成,因为循环。一个目录发布到 dirBroadcast 广播器,该广播器发布到 dirfinder 可能会将新目录回发到广播器,所以我不能简单地将其标记为完整,因为它会阻止从 dirfinder 添加的任何目录。我应该重新设计它以跟踪目录的数量,还是在 TPL 中有任何相关内容。
如果您的代码的目的是使用某种并行性遍历目录结构,那么我建议不要使用 TPL Dataflow 并使用 Microsoft 的 Reactive Framework。我认为事情变得简单多了。
我就是这样做的。
首先定义一个递归函数来构建目录列表:
Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
Observable
.Return(di)
.Concat(di.GetDirectories()
.ToObservable()
.SelectMany(di2 => recurse(di2)))
.ObserveOn(Scheduler.Default);
Run Code Online (Sandbox Code Playgroud)
这会执行目录的递归并使用默认的 Rx 调度程序,这会导致可观察对象并行运行。
因此,通过使用recurse输入进行调用DirectoryInfo,我可以获得输入目录及其所有后代的可观察列表。
现在我可以构建一个相当直接的查询来获得我想要的结果:
var query =
from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
from fi in di.GetFiles().ToObservable()
let zxy =
fi
.FullName
.Split('\\')
.Reverse()
.Take(3)
.Reverse()
.Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
.ToArray()
let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
select new FileInfo(Path.Combine(di.FullName, suffix));
Run Code Online (Sandbox Code Playgroud)
现在我可以像这样操作查询:
query
.Subscribe(s =>
{
Trace.TraceInformation("Done combining : {0}", s.Name);
});
Run Code Online (Sandbox Code Playgroud)
现在我可能在您的自定义代码中遗漏了一些内容,但如果这是您想要采用的方法,我相信您可以很容易地解决任何逻辑问题。
当子目录和文件用完时,此代码会自动处理完成。
要将 Rx 添加到项目中,请在 NuGet 中查找“Rx-Main”。
| 归档时间: |
|
| 查看次数: |
731 次 |
| 最近记录: |