下游的TPL数据流块如何获取源生成的数据?

cha*_*ase 7 .net c# task-parallel-library tpl-dataflow

我正在使用TPL Dataflow处理图像.我收到处理请求,从流中读取图像,应用几个转换,然后将生成的图像写入另一个流:

Request -> Stream -> Image -> Image ... -> Stream
Run Code Online (Sandbox Code Playgroud)

为此,我使用块:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>
Run Code Online (Sandbox Code Playgroud)

问题是初始Request是包含创建结果所需的一些数据Stream以及我需要的一些额外信息.我是否必须将原始Request(或其他一些上下文对象)传递给writerBlock所有其他块,如下所示:

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...
Run Code Online (Sandbox Code Playgroud)

(这很难看),还是有办法将第一个块链接到最后一个块(或者,推广到需要附加数据的块)?

svi*_*ick 7

是的,你几乎需要做你所描述的,将每个块中的附加数据传递给下一个块.

但是使用几个辅助方法,你可以使这更简单:

public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
    CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
        input => Tuple.Create(transform(input), input));
}

public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
    CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}
Run Code Online (Sandbox Code Playgroud)

签名看起来令人生畏,但实际上并没有那么糟糕.

此外,您可能希望添加将选项传递给创建的块的重载,或者重载以获取异步委托的重载.

例如,如果您想使用单独的块对数字执行某些操作,同时沿途传递原始数字,您可以执行以下操作:

var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));

source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);

for (int i = 0; i < 10; i++)
    source.Post(i);
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,您的lambda(除了最后一个)只处理"当前"值(int,double或者string,根据管道的阶段),"原始"的值(总int)会自动传递.在任何时候,您都可以使用使用普通构造函数创建的块来访问这两个值(如ActionBlock示例中的final ).

(这BufferBlock实际上并不是必需的,但我添加它以更贴合您的设计.)