Wiz*_*bit 5 c# task-parallel-library tpl-dataflow
我是 TPL 数据流的新手。
我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。
但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。
请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。
进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlock或TransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。
这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)
失败的例子:
class Program
{
static void Main(string[] args)
{
var context = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
// shove 10 things onto the buffer
for(int i = 0; i < 10; i++)
{
// shove something on the buffer every second
buffer.Post(i);
Thread.Sleep(1000);
}
context.Cancel();
}
// 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
// 2. Then we should see nothing for a second
// 3. Then we immediately process the next element from the await ReceiveAsync.
// 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
IList<int> elements;
if (!buffer.TryReceiveAll(out elements))
{
try
{
var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
elements = new List<int> { element };
}
catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
}
if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
}
}
catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
}
}
Run Code Online (Sandbox Code Playgroud)
输出是:
11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5
Run Code Online (Sandbox Code Playgroud)
...等等
但是如果我改变日志行
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
Run Code Online (Sandbox Code Playgroud)
到
Console.WriteLine("This Works...");
Run Code Online (Sandbox Code Playgroud)
输出按预期出现:
This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...
Run Code Online (Sandbox Code Playgroud)
等等。但即使是从控制台复制文本的行为也会将其切换回失败的输出。
想法?