TPL DataFlow,优先级链接块?

Rog*_*son 4 c# task-parallel-library tpl-dataflow

使用TPL.DataFlow块,是否可以将两个或多个源链接到单个ITargetBlock(例如ActionBlock)并确定源的优先级?

例如

BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...

//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);
Run Code Online (Sandbox Code Playgroud)

只要b1中有消息,我希望这些消息被输入"a",一旦b1为空,b2消息就被推送到"a"

想法?

svi*_*ick 9

TPL Dataflow本身没有类似的东西.

我能想象的最简单的方法就是创建一个封装三个块的结构:高优先级输入,低优先级输入和输出.这些块将是简单BufferBlock的,以及在后台运行的基于优先级将消息从两个输入转发到输出的方法.

代码可能如下所示:

public class PriorityBlock<T>
{
    private readonly BufferBlock<T> highPriorityTarget;

    public ITargetBlock<T> HighPriorityTarget
    {
        get { return highPriorityTarget; }
    }

    private readonly BufferBlock<T> lowPriorityTarget;

    public ITargetBlock<T> LowPriorityTarget
    {
        get { return lowPriorityTarget; }
    }

    private readonly BufferBlock<T> source;

    public ISourceBlock<T> Source
    {
        get { return source; }
    }

    public PriorityBlock()
    {
        var options = new DataflowBlockOptions { BoundedCapacity = 1 };

        highPriorityTarget = new BufferBlock<T>(options);
        lowPriorityTarget = new BufferBlock<T>(options);
        source = new BufferBlock<T>(options);

        Task.Run(() => ForwardMessages());
    }

    private async Task ForwardMessages()
    {
        while (true)
        {
            await Task.WhenAny(
                highPriorityTarget.OutputAvailableAsync(),
                lowPriorityTarget.OutputAvailableAsync());

            T item;

            if (highPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else if (lowPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else
            {
                // both input blocks must be completed
                source.Complete();
                return;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

用法如下所示:

b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);
Run Code Online (Sandbox Code Playgroud)

为此,a还必须BoundingCapacity设置为一个(或至少非常低的数字).

这段代码的警告是它可以引入两条消息的延迟(一条在输出块中等待,一条等待SendAsync()).因此,如果您有一长串低优先级消息,并且突然出现高优先级消息,则只有在那两个已经等待的低优先级消息之后才会处理它.

如果这对您来说是个问题,那就可以解决了.但我相信它需要更复杂的代码,它处理TPL Dataflow的公共部分,例如OfferMessage().