Kur*_*rru 5 c# queue multithreading monitor
我有两个线程,一个线程处理队列,另一个线程将内容添加到队列中。
然而,这些函数在调用时调用System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code,Monitor.PulseAll(waiting);因为我尚未将函数与等待对象同步。[我不想这样做,我希望能够在将项目添加到队列时进行处理]。我怎样才能实现这个目标?
Queue<object> items = new Queue<object>();
object waiting = new object();
Run Code Online (Sandbox Code Playgroud)
第一个线程
public void ProcessQueue()
{
while (true)
{
if (items.Count == 0)
Monitor.Wait(waiting);
object real = null;
lock(items) {
object item = items.Dequeue();
real = item;
}
if(real == null)
continue;
.. bla bla bla
}
}
Run Code Online (Sandbox Code Playgroud)
第二个线程涉及
public void AddItem(object o)
{
... bla bla bla
lock(items)
{
items.Enqueue(o);
}
Monitor.PulseAll(waiting);
}
Run Code Online (Sandbox Code Playgroud)
如果您可以访问.NET 4.0,则可以通过BlockingCollection<T>来实现您想要做的事情。如果您想通过类和信号
来自己完成,那么您实际上走在正确的轨道上。
您会得到异常,因为要调用,并且您必须拥有指定对象的。你碰巧错过了这个。MonitorPulse()Wait()Pulse()PulseAll()lockwaiting
可以使用的基本线程安全队列示例:
foreach对消费者来说,while您最喜欢的生产者方面的条件构造,lock()、Monitor.Pulse()和: Monitor.PulseAll() Monitor.Wait()。
public class SignaledQueue<T>
{
Queue<T> queue = new Queue<T>();
volatile bool shutDown = false;
public bool Enqueue(T item)
{
if (!shutDown)
{
lock (queue)
{
queue.Enqueue(item);
//Pulse only if there can be waiters.
if (queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
return true;
}
//Indicate that processing should stop.
return false;
}
public IEnumerable<T> DequeueAll()
{
while (!shutDown)
{
do
{
T item;
lock (queue)
{
//If the queue is empty, wait.
if (queue.Count == 0)
{
if (shutDown) break;
Monitor.Wait(queue);
if (queue.Count == 0) break;
}
item = queue.Dequeue();
}
yield return item;
} while (!shutDown);
}
}
public void SignalShutDown()
{
shutDown = true;
lock (queue)
{
//Signal all waiting consumers with PulseAll().
Monitor.PulseAll(queue);
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用示例:
class Program
{
static void Main(string[] args)
{
int numProducers = 4, numConsumers = 2;
SignaledQueue<int> queue = new SignaledQueue<int>();
ParameterizedThreadStart produce = delegate(object obj)
{
Random rng = new Random((int)obj);
int num = 0;
while (queue.Enqueue(++num))
{
Thread.Sleep(rng.Next(100));
}
};
ThreadStart consume = delegate
{
foreach (int num in queue.DequeueAll())
{
Console.Write(" {0}", num);
}
};
Random seedRng = new Random();
for (int i = 0; i < numProducers; i++)
{
new Thread(produce).Start(seedRng.Next());
}
for (int i = 0; i < numConsumers; i++)
{
new Thread(consume).Start();
}
Console.ReadKey(true);
queue.SignalShutDown();
}
}
Run Code Online (Sandbox Code Playgroud)