use*_*383 -1 c# task-parallel-library tpl-dataflow
在下面的测试代码中,我期待这个结果:
1, 2000
2, 4000
3, 6000
Run Code Online (Sandbox Code Playgroud)
然而实际的结果是:
3, 6000
2, 4000
1, 2000
Run Code Online (Sandbox Code Playgroud)
此外,我只能在 6 秒后在屏幕上看到结果。这意味着任何被竞争的输入都在等待并处理到下一个阶段。
如何使管道在完成后立即吐出每个输入的结果?
public static void Run()
{
Func<int, string> fn = n =>
{
var sleep = n * 2000;
Thread.Sleep(sleep);
return n + ", " + sleep;
};
var opts = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
};
var transformBlock = new TransformBlock<int, string>(fn, opts);
var bufferBlock = new BufferBlock<string>(opts);
transformBlock.LinkTo(bufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 3; i > 0; i--)
transformBlock.Post(i);
Console.WriteLine(bufferBlock.Receive());
Console.WriteLine(bufferBlock.Receive());
Console.WriteLine(bufferBlock.Receive());
}
Run Code Online (Sandbox Code Playgroud)
默认情况下,即使消息是并行处理的,数据流也会保留消息的顺序。
要尽可能快地转换消息,即乱序,EnsureOrdered请false在您的TransformBlock.
请务必使用最新版本的数据流(目前 nuget 包System.Threading.Tasks.Dataflow存在于 4.9 版中),因为EnsureOrdered并非总是如此。
例子:
class Program
{
static void Main( string[] args )
{
var transformBlock = new TransformBlock<int, int>( x =>
{
Thread.Sleep( x );
return x;
}, new ExecutionDataflowBlockOptions {EnsureOrdered = false, MaxDegreeOfParallelism = 10} );
var actionBlock = new ActionBlock<int>( x => Console.WriteLine( x ) );
transformBlock.LinkTo( actionBlock, new DataflowLinkOptions {PropagateCompletion = true});
for( var i = 9; i > 0; i-- )
transformBlock.Post( i * 1000 );
transformBlock.Complete();
actionBlock.Completion.Wait();
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
这输出
1000
2000
3000
4000
5000
6000
7000
8000
9000
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
887 次 |
| 最近记录: |