在TPL数据流中保证交付的BroadcastBlock

Bri*_*ice 6 task-parallel-library tpl-dataflow

我有一个数据流,我以几种不同的方式处理...所以我想发送每个消息的副本我得到多个目标,以便这些目标可以并行执行...但是,我需要设置BoundedCapacity在我的块上,因为数据的流式传输方式比我的目标可以处理的速度快,并且有大量数据.如果没有BoundedCapacity,我会很快耗尽内存.

但是问题是如果目标无法处理它,BroadcastBlock将丢弃消息(由于BoundedCapacity).

我需要的是一个不会丢弃消息的BroadcastBlock,但实质上会拒绝其他输入,直到它可以向每个目标传递消息然后准备好更多.

有这样的东西,还是有人编写了一个以这种方式运行的自定义块?

svi*_*ick 7

这是相当简单的搭建你使用问什么ActionBlockSendAsync(),是这样的:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets)
{
    var targetsList = targets.ToList();

    return new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
Run Code Online (Sandbox Code Playgroud)

这是最基本的版本,但扩展它以支持可变的目标列表,传播完成或克隆功能应该很容易.