用于 TPL 数据流的 BroadcastCopyBlock 保证交付

cra*_*ank 4 c# broadcast task-parallel-library tpl-dataflow

我很高兴为BroadcastCopyBlockTPL 数据流中的以下实现提供一些输入,该实现将收到的消息复制到所有消费者,该消息已注册BroadcastCopyBlock并保证交付给所有消费者,这些消费者在收到消息时链接到块. (与BroadcastBlock不保证消息传递的不同,如果下一个进来,则在前一个消息已传递给所有消费者之前)。

我主要关心的是消息的保留和保留的释放。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将被无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何。我正在考虑一些人工消息接收器(ActionBlock没有任何操作),或者我可以将消息标记为已丢弃?

对实施的进一步投入也受到赞赏。

这可能几乎是以下问题的重复,但我更愿意使用我自己的类,而不是创建块的方法。或者这会被认为是不好的风格吗?
在 TPL 数据流中具有保证交付的 BroadcastBlock

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
    private ITargetBlock<T> In { get; }

    /// <summary>
    /// Holds a TransformBlock for each target, that subscribed to this block
    /// </summary>
    private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();


    public BrodcastCopyBlock()
    {
        In = new ActionBlock<T>(message => Process(message));

        In.Completion.ContinueWith(task =>
                                   {
                                       if (task.Exception == null)
                                           Complete();
                                       else
                                           Fault(task.Exception);
                                   }
          );
    }

    /// <summary>
    /// Creates a transform source block for the passed target.
    /// </summary>
    /// <param name="target"></param>
    private void CreateOutBlock(ITargetBlock<T> target)
    {
        if (_OutBlocks.ContainsKey(target))
            return;

        var outBlock = new TransformBlock<T, T>(e => e);
        _OutBlocks[target] = outBlock;
    }

    private void Process(T message)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            outBlock.Post(message);
        }
    }

    /// <inheritdoc />
    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    /// <inheritdoc />
    public void Complete()
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Complete();
        }
    }

    /// <inheritdoc />
    public void Fault(Exception exception)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Fault(exception);
        }
    }

    /// <inheritdoc />
    public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));

    /// <inheritdoc />
    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        CreateOutBlock(target);
        return _OutBlocks[target].LinkTo(target, linkOptions);
    }

    /// <inheritdoc />
    public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    /// <inheritdoc />
    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
    }

    /// <inheritdoc />
    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        ((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
    }
}
Run Code Online (Sandbox Code Playgroud)

VMA*_*Atm 8

TL/DR
你的实现使用了Post里面的方法ActionBlock,如果目标拒绝消息,它仍然会丢失数据,切换到SendAsync一个,而且,你可能不需要实现所有这些方法,你只需要ITargetBlock<in TInput>接口实现。


在回到你的主要问题之前,我想澄清一些事情。我认为你对TPL Dataflow图书馆的一些选项感到困惑,我想在这里解释一下。您所说的行为The first consumer, which receives the message, deletes it from the queue与 无关BroadcastBlock,而是关于为 an 链接的多个消费者ISourceBlock,例如BufferBlock

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
Run Code Online (Sandbox Code Playgroud)

该什么BroadcastBlock做的是什么,你说什么,考虑下面的代码:

private static void UnboundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Unbounded Block: {i}");
        });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
Run Code Online (Sandbox Code Playgroud)

但是,这只能在传入数据的速度小于处理数据的速度时完成,因为在其他情况下,正如您在问题中所述,由于缓冲区增长,您的内存将很快结束。让我们看看如果我们使用ExecutionDataflowBlockOptionsfor 限制慢速块的传入数据缓冲区会发生什么:

private static void BoundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Bounded Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)

输出将是

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
Run Code Online (Sandbox Code Playgroud)

如您所见,我们的慢块丢失了最后一条消息,这不是我们要寻找的。这样做的原因是,BroadcastBlock默认情况下,使用Post方法来传递消息。根据官方介绍文件

  • 邮政
    • 异步发布到目标块的扩展方法。无论数据是否可以被接受,它都会立即返回,并且不允许目标稍后消费消息
  • 发送异步
    • 一种在支持缓冲的同时异步发送到目标块的扩展方法。Post对目标的操作是异步的,但如果目标想要推迟提供的数据,则数据无处可缓冲,而必须强制拒绝目标。SendAsync启用带有缓冲的数据的异步发布,这样,如果目标推迟,它稍后将能够从用于此异步发布消息的临时缓冲区中检索推迟的数据

所以,这个方法可以帮助我们完成我们的任务,让我们引入一些 wrapper ActionBlock,它完全符合我们的要求——SendAsync我们真正处理器的数据:

private static void BoundedWrapperInfiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
Run Code Online (Sandbox Code Playgroud)

但是这种等待永远不会结束——我们的基本包装器不会传播链接块的完成,并且ActionBlock不能链接到任何东西。我们可以尝试等待包装完成:

private static void BoundedWrapperFiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW finite Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowActionWrapper.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)

输出将是

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
Run Code Online (Sandbox Code Playgroud)

这绝对不是我们想要的 -ActionBlock完成了所有工作,并且不会等待最后一条消息的发布。此外,我们甚至看不到第二条消息,因为我们在Sleep方法结束之前退出了方法!所以你肯定需要你自己的实现。

现在,最后,关于您的代码的一些想法:

  1. 您不需要实现如此大量的方法 - 您的包装器将用作ITargetBlock<in TInput>,因此仅实现该接口。
  2. 您的实现使用了Post内部的方法ActionBlock,正如我们所看到的,如果消费者方面出现一些问题,这可能会导致数据丢失。考虑SendAsync方法。
  3. 在之前的更改之后,您应该衡量数据流的性能 - 如果您有很多异步等待数据交付,您可能会看到性能和/或内存问题。这应该通过链接文档中讨论的一些高级设置来解决。
  4. 您对Completion任务的实现实际上颠倒了数据流的顺序 - 您正在等待目标完成,我认为这不是一个好习惯 - 您可能应该为您的数据流创建一个结束块(这甚至可能是NullTarget块,它简单地同步删除传入的消息),并等待它完成。