Alf*_*feG 0 .net c# lambda multithreading
我已经创建了一个自定义线程池实用程序,但似乎有一个我找不到的问题.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
private readonly Queue queue = Queue.Synchronized(new Queue());
private readonly AutoResetEvent res = new AutoResetEvent(true);
private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
private readonly Semaphore sem = new Semaphore(1, 4);
private readonly Thread thread;
private Action<T> DoWork;
private int Num_Of_Threads;
private QueueManager()
{
Num_Of_Threads = 0;
maxThread = 5;
thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
thread.Start();
// log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
}
public int maxThread { get; set; }
public static FileUploadQueueManager<T> Instance
{
get { return Nested.instance; }
}
/// <summary>
/// Executes multythreaded operation under items
/// </summary>
/// <param name="list">List of items to proceed</param>
/// <param name="action">Action under item</param>
/// <param name="MaxThreads">Maximum threads</param>
public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
maxThread = MaxThreads;
DoWork = action;
foreach (T item in list)
{
Add(item);
}
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
{
ExecuteNoThread(list, action, 0);
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
foreach (T wallpaper in list)
{
action(wallpaper);
}
}
/// <summary>
/// Default 10 threads
/// </summary>
/// <param name="list"></param>
/// <param name="action"></param>
public void Execute(IEnumerable<T> list, Action<T> action)
{
Execute(list, action, 10);
}
private void Add(T item)
{
lock (queue)
{
queue.Enqueue(item);
}
res.Set();
}
private void Worker()
{
while (true)
{
if (queue.Count == 0)
{
res.WaitOne();
}
if (Num_Of_Threads < maxThread)
{
var t = new Thread(Proceed);
t.Start();
}
else
{
res_thr.WaitOne();
}
}
}
private void Proceed()
{
Interlocked.Increment(ref Num_Of_Threads);
if (queue.Count > 0)
{
var item = (T) queue.Dequeue();
sem.WaitOne();
ProceedItem(item);
sem.Release();
}
res_thr.Set();
Interlocked.Decrement(ref Num_Of_Threads);
}
private void ProceedItem(T activity)
{
if (DoWork != null)
DoWork(activity);
lock (Instance)
{
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
}
}
#region Nested type: Nested
protected class Nested
{
// Explicit static constructor to tell C# compiler
// not to mark type as beforefieldinit
internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
}
#endregion
}
Run Code Online (Sandbox Code Playgroud)
}
问题在这里:
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
Run Code Online (Sandbox Code Playgroud)
标题中始终有一个线程ID.程序似乎在一个线程中工作.
样品用法:
var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
QueueManager<int>.Instance.Execute(i_list,
i =>
{
Console.WriteLine("Some action under element number {0}", i);
}, 5);
Run Code Online (Sandbox Code Playgroud)
PS:它很乱,但我还在努力.
我查看了你的代码,这里有一些我看过的问题.
Proceed方法不是线程安全的.这两行是问题所在
if (queue.Count > 0) { var item = (T)queue.Dequeue(); ... }
使用同步队列仅保证单个访问是安全的.因此.Count和.Dequeue方法都不会混淆队列的内部结构.但是,想象一下两个线程同时运行这些代码行并且计数为1的队列的情况
Worker和Proceed之间存在竞争条件,可能导致死锁.应切换以下两行代码.
码:
res_thr.Set() Interlocked.Decrement(ref Num_Of_Threads);
第一行将取消阻止Worker方法.如果它运行得足够快,它将返回到外观,注意Num_Of_Threads <maxThreads并直接返回res_thr.WaitOne().如果当前没有其他线程正在运行,那么这将导致代码中出现死锁.使用较少的最大线程数(例如1)非常容易命中.反转这两行代码应该可以解决问题.
归档时间: |
|
查看次数: |
1286 次 |
最近记录: |