Rog*_*son 4 c# task-parallel-library tpl-dataflow
使用TPL.DataFlow块,是否可以将两个或多个源链接到单个ITargetBlock(例如ActionBlock)并确定源的优先级?
例如
BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...
//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);
Run Code Online (Sandbox Code Playgroud)
只要b1中有消息,我希望这些消息被输入"a",一旦b1为空,b2消息就被推送到"a"
想法?
TPL Dataflow本身没有类似的东西.
我能想象的最简单的方法就是创建一个封装三个块的结构:高优先级输入,低优先级输入和输出.这些块将是简单BufferBlock
的,以及在后台运行的基于优先级将消息从两个输入转发到输出的方法.
代码可能如下所示:
public class PriorityBlock<T>
{
private readonly BufferBlock<T> highPriorityTarget;
public ITargetBlock<T> HighPriorityTarget
{
get { return highPriorityTarget; }
}
private readonly BufferBlock<T> lowPriorityTarget;
public ITargetBlock<T> LowPriorityTarget
{
get { return lowPriorityTarget; }
}
private readonly BufferBlock<T> source;
public ISourceBlock<T> Source
{
get { return source; }
}
public PriorityBlock()
{
var options = new DataflowBlockOptions { BoundedCapacity = 1 };
highPriorityTarget = new BufferBlock<T>(options);
lowPriorityTarget = new BufferBlock<T>(options);
source = new BufferBlock<T>(options);
Task.Run(() => ForwardMessages());
}
private async Task ForwardMessages()
{
while (true)
{
await Task.WhenAny(
highPriorityTarget.OutputAvailableAsync(),
lowPriorityTarget.OutputAvailableAsync());
T item;
if (highPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else if (lowPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else
{
// both input blocks must be completed
source.Complete();
return;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
用法如下所示:
b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);
Run Code Online (Sandbox Code Playgroud)
为此,a
还必须BoundingCapacity
设置为一个(或至少非常低的数字).
这段代码的警告是它可以引入两条消息的延迟(一条在输出块中等待,一条等待SendAsync()
).因此,如果您有一长串低优先级消息,并且突然出现高优先级消息,则只有在那两个已经等待的低优先级消息之后才会处理它.
如果这对您来说是个问题,那就可以解决了.但我相信它需要更复杂的代码,它处理TPL Dataflow的公共部分,例如OfferMessage()
.