Aar*_*ack 5 c# multithreading task-parallel-library blockingcollection tpl-dataflow
我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。
再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。
关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。
var blockingCollection = new BlockingCollection<int>(10000);
var takeBlock = new ActionBlock<int>((i) =>
{
int j = blockingCollection.Take();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>((i) =>
{
blockingCollection.Add(i);
return i;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20
});
addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
for (int i = 0; i < 100000; i++)
{
addBlock.Post(i);
}
addBlock.Complete();
await addBlock.Completion;
await takeBlock.Completion;
Run Code Online (Sandbox Code Playgroud)
TPL Dataflow 不适合与阻塞很多的代码一起使用,我认为这个问题源于此。
我无法弄清楚究竟发生了什么,但我认为解决方案是使用非阻塞集合。方便的是,Dataflow 为您提供了BufferBlock. 这样,您的代码将如下所示:
var bufferBlock = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10000 });
var takeBlock = new ActionBlock<int>(
async i =>
{
int j = await bufferBlock.ReceiveAsync();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>(
async i =>
{
await bufferBlock.SendAsync(i);
return i;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20
});
Run Code Online (Sandbox Code Playgroud)
虽然我发现你的代码的整个设计很可疑。如果您想要与块的正常结果一起发送一些附加数据,请将该块的输出类型更改为包含该附加数据的类型。