Jam*_*mes 49 c# multithreading waithandle
我的应用程序产生了大量不同的小工作线程,通过ThreadPool.QueueUserWorkItem
它我可以通过多个ManualResetEvent
实例跟踪.我使用该WaitHandle.WaitAll
方法阻止我的应用程序关闭,直到这些线程完成.
但是,之前我从未遇到过任何问题,因为我的应用程序正在承受更多负载,即创建更多线程,我现在开始得到这个异常:
WaitHandles must be less than or equal to 64 - missing documentation
什么是最好的替代解决方案?
代码片段
List<AutoResetEvent> events = new List<AutoResetEvent>();
// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
// do work
evt.Set();
});
...
WaitHandle.WaitAll(events.ToArray());
Run Code Online (Sandbox Code Playgroud)
解决方法
int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
finished.WaitOne();
Run Code Online (Sandbox Code Playgroud)
dtb*_*dtb 46
创建一个跟踪正在运行的任务数量的变量:
int numberOfTasks = 100;
Run Code Online (Sandbox Code Playgroud)
创建一个信号:
ManualResetEvent signal = new ManualResetEvent(false);
Run Code Online (Sandbox Code Playgroud)
任务完成时减少任务数量:
if (Interlocked.Decrement(ref numberOftasks) == 0)
{
Run Code Online (Sandbox Code Playgroud)
如果没有剩余任务,请设置信号:
signal.Set();
}
Run Code Online (Sandbox Code Playgroud)
同时,在其他地方,等待信号设置:
signal.WaitOne();
Run Code Online (Sandbox Code Playgroud)
cas*_*One 40
从.NET 4.0开始,您可以使用另外两个(和IMO,更干净)选项.
首先是使用CountdownEvent
该类.它可以防止必须自己处理递增和递减:
int tasks = <however many tasks you're performing>;
// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
// Queue work.
ThreadPool.QueueUserWorkItem(() => {
// Do work
...
// Signal when done.
e.Signal();
});
// Wait till the countdown reaches zero.
e.Wait();
}
Run Code Online (Sandbox Code Playgroud)
但是,有一个更强大的解决方案,那就是使用Task
类,如下所示:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Factory.StartNew(() => {
// Do work.
}
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Task.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)
使用Task
类和调用WaitAll
更加清晰,IMO,因为你在整个代码中编织更少的线程原语(注意,没有等待句柄); 您不必设置计数器,处理递增/递减,您只需设置任务然后等待它们.这使得代码在您想要做的事情中更具表现力,而不是如何表达(至少在管理它的并行化方面).
.NET 4.5提供了更多选项,您可以Task
通过调用类上的静态Run
方法Task
来简化实例序列的生成:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Run(() => {
// Do work.
})
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Tasks.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)
或者,您可以利用TPL DataFlow库(它位于System
命名空间中,因此它是官方的,即使它是从NuGet下载,如实体框架)并使用ActionBlock<TInput>
,如下所示:
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
// Do work.
});
// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)
请注意,ActionBlock<TInput>
默认情况下,一次处理一个项目,因此如果您希望一次处理多个操作,则必须通过传递ExecutionDataflowBlockOptions
实例并设置MaxDegreeOfParallelism
属性来设置要在构造函数中处理的并发项目数.:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
Run Code Online (Sandbox Code Playgroud)
如果您的操作确实是线程安全的,那么您可以将该MaxDegreeOfParallelsim
属性设置为DataFlowBlockOptions.Unbounded
:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});
Run Code Online (Sandbox Code Playgroud)
问题的关键是,你有过细致的控制如何并行你希望你的选择是.
当然,如果您有一系列要传递到ActionBlock<TInput>
实例中的项目,那么您可以链接ISourceBlock<TOutput>
实现来提供ActionBlock<TInput>
,如下所示:
// The buffer block.
var buffer = new BufferBlock<int>();
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
// Do work.
});
// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock,
// Want to propagate completion state to the action block.
new DataflowLinkOptions {
PropagateCompletion = true,
},
// Can filter on items flowing through if you want.
i => true)
{
// Post 100 times to the *buffer*
foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)
根据你需要做什么,在TPL数据流库成为很多更有吸引力的选择,因为它处理跨并发所有连在一起的任务,它可以让你要非常具体只是希望每个片是如何并行,同时保持每个区块的关注点的正确分离.
Bri*_*eon 17
您的解决方法不正确.其原因是,Set
和WaitOne
如果最后工作项目导致比赛可能threadCount
要到零之前排队线程不得不机会排队的所有工作项目.修复很简单.将排队线程视为工作项本身.初始化threadCount
为1并在排队完成时进行减量并发出信号.
int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
finished.WaitOne();
Run Code Online (Sandbox Code Playgroud)
作为个人喜好,我喜欢用CountdownEvent
课程为我做计数.
var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
finished.Signal();
}
});
...
finished.Signal();
finished.Wait();
Run Code Online (Sandbox Code Playgroud)
添加到dtb的答案,你可以把它包装成一个很好的简单类.
public class Countdown : IDisposable
{
private readonly ManualResetEvent done;
private readonly int total;
private long current;
public Countdown(int total)
{
this.total = total;
current = total;
done = new ManualResetEvent(false);
}
public void Signal()
{
if (Interlocked.Decrement(ref current) == 0)
{
done.Set();
}
}
public void Wait()
{
done.WaitOne();
}
public void Dispose()
{
((IDisposable)done).Dispose();
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
26199 次 |
最近记录: |