如何标记一个 TPL 数据流周期完成?

Pou*_*sen 5 .net c# task-parallel-library tpl-dataflow

鉴于 TPL 数据流中的以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}
Run Code Online (Sandbox Code Playgroud)

我想知道如何将其标记为完成,因为循环。一个目录发布到 dirBroadcast 广播器,该广播器发布到 dirfinder 可能会将新目录回发到广播器,所以我不能简单地将其标记为完整,因为它会阻止从 dirfinder 添加的任何目录。我应该重新设计它以跟踪目录的数量,还是在 TPL 中有任何相关内容。

Eni*_*ity 4

如果您的代码的目的是使用某种并行性遍历目录结构,那么我建议不要使用 TPL Dataflow 并使用 Microsoft 的 Reactive Framework。我认为事情变得简单多了。

我就是这样做的。

首先定义一个递归函数来构建目录列表:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);
Run Code Online (Sandbox Code Playgroud)

这会执行目录的递归并使用默认的 Rx 调度程序,这会导致可观察对象并行运行。

因此,通过使用recurse输入进行调用DirectoryInfo,我可以获得输入目录及其所有后代的可观察列表。

现在我可以构建一个相当直接的查询来获得我想要的结果:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));
Run Code Online (Sandbox Code Playgroud)

现在我可以像这样操作查询:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });
Run Code Online (Sandbox Code Playgroud)

现在我可能在您的自定义代码中遗漏了一些内容,但如果这是您想要采用的方法,我相信您可以很容易地解决任何逻辑问题。

当子目录和文件用完时,此代码会自动处理完成。

要将 Rx 添加到项目中,请在 NuGet 中查找“Rx-Main”。