使用TPL Dataflow封装以动作块结尾的管道

bor*_*egg 6 c# task-parallel-library tpl-dataflow

TPL Dataflow提供了非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)
Run Code Online (Sandbox Code Playgroud)

使您能够将多个块封装到单个转换块中.它返回一个

IPropagatorBlock<TInput, TOutput>
Run Code Online (Sandbox Code Playgroud)

它代表管道的起始和结束块.

但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.

基本上,我正在寻找的是这样的功能:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)
Run Code Online (Sandbox Code Playgroud)

这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何写呢-尤其是类设置完成.我需要创建自己的自定义块类型吗?

编辑:

好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 4

封装不用于抽象现有管道,它用于创建需要自定义行为的传播器块,而这些自定义行为无法使用现有块和链接来实现。

例如,滑动窗口示例缓冲发布到其输入块的所有传入消息,并在滑动窗口到期时输出一批所有检索到的消息。

该方法的名称会造成很多混乱,但当您了解其目的时,它们确实有意义:

  • target参数是目标(输入)端点,前面的块将连接到该端点发送消息。在这种情况下,处理传入消息并决定是否发布到输出(源)块的 ActionBlock 是有意义的。
  • source参数是源(输出)端点,后续步骤将连接到该端点以接收消息。使用 ActionBlock 作为源没有任何意义,因为它没有任何输出。

Encapsulate接受 ActionBlock 方法的变体没有source用,因为您可以简单地从任何先前步骤链接到操作块。

编辑

如果您想模块化管道,即将其分解为可重用的、更易于管理的,您可以创建一个构造类,您可以使用普通的旧类。在该类中,您可以正常构建管道片段,链接块(确保传播完成),然后将第一步和最后一步的完成任务公开为公共属性,例如:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}
Run Code Online (Sandbox Code Playgroud)

要将片段连接到管道,您只需从管道块链接到公开Input块即可。要等待完成,只需等待公开的Completion任务即可。

如果需要,您可以停在这里,或者您可以实现ITargetBlock以使片段看起来像 Target 块。您只需将所有方法委托给 Input 块,并将 Completion 属性委托给 Final 块即可。

例如:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑2

使用 @bornfromanegg 的第一类可以将构建片段的行为与公开输入和完成的样板文件分开:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}
Run Code Online (Sandbox Code Playgroud)