TPL Dataflow BufferBlock 线程安全吗?

Har*_*lse 5 c# dataflow producer-consumer task-parallel-library tpl-dataflow

我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。

为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>

一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。

send data to the同时有两个“生产者BufferBlock”

简化:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}
Run Code Online (Sandbox Code Playgroud)

我想先启动消费者,然后将生产者作为单独的任务启动:

var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());

// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock

// await for the Consumer to finish:
await taskConsumer;
Run Code Online (Sandbox Code Playgroud)

乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而消费者正在消费生产的数据。

然而,关于线程安全的 BufferBlock说:

不保证任何实例成员都是线程安全的。

我认为TPL中的P意味着平行!我应该担心吗?我的代码不是线程安全的吗?我应该使用不同的 TPL Dataflow 类吗?

The*_*ias 4

是的,该类BufferBlock是线程安全的。我无法通过指向官方文档来支持这一说法,因为“线程安全”部分已从文档中删除。但我可以在源代码中看到该类包含一个用于同步传入消息的锁对象:

/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
Run Code Online (Sandbox Code Playgroud)

Post调用扩展方法(源代码)时,ITargetBlock.OfferMessage将调用显式实现的方法(源代码)。以下是该方法的摘录:

DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    //...
    lock (IncomingLock)
    {
        //...
        _source.AddMessage(messageValue);
        //...
    }
}
Run Code Online (Sandbox Code Playgroud)

如果这个类或TPL DataflowXxxBlock库中包含的任何其他类不是线程安全的,那确实会很奇怪。这将严重影响这个伟大图书馆的易用性。