为什么 BufferBlock<T>.ReceiveAsync() 在数据可用时挂起?

Wiz*_*bit 5 c# task-parallel-library tpl-dataflow

我是 TPL 数据流的新手。

我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。

但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。

请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。

进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlockTransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。

这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)

失败的例子:

class Program
{
    static void Main(string[] args)
    {
        var context = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
        var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);

        // shove 10 things onto the buffer
        for(int i = 0; i < 10; i++)
        {
            // shove something on the buffer every second
            buffer.Post(i);
            Thread.Sleep(1000);
        }
        context.Cancel();
    }

    // 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
    // 2. Then we should see nothing for a second
    // 3. Then we immediately process the next element from the await ReceiveAsync.
    // 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
    static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
                IList<int> elements;
                if (!buffer.TryReceiveAll(out elements))
                {
                    try
                    {
                        var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
                        elements = new List<int> { element };
                    }
                    catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
                }
                if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
            }
        }
        catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
    }
}
Run Code Online (Sandbox Code Playgroud)

输出是:

11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5
Run Code Online (Sandbox Code Playgroud)

...等等

但是如果我改变日志行

Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
Run Code Online (Sandbox Code Playgroud)

Console.WriteLine("This Works...");
Run Code Online (Sandbox Code Playgroud)

输出按预期出现:

This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...
Run Code Online (Sandbox Code Playgroud)

等等。但即使是从控制台复制文本的行为也会将其切换回失败的输出。

想法?