我的自定义线程池有什么问题?

Alf*_*feG 0 .net c# lambda multithreading

我已经创建了一个自定义线程池实用程序,但似乎有一个我找不到的问题.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}
Run Code Online (Sandbox Code Playgroud)

}

问题在这里:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);
Run Code Online (Sandbox Code Playgroud)

标题中始终有一个线程ID.程序似乎在一个线程中工作.

样品用法:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);
Run Code Online (Sandbox Code Playgroud)

PS:它很乱,但我还在努力.

Jar*_*Par 5

我查看了你的代码,这里有一些我看过的问题.

  1. 即使它是同步队列,也会锁定队列对象.这是不必要的
  2. 您不一致地锁定队列对象.它应该为每次访问锁定或不锁定,具体取决于Synchronized行为.
  3. Proceed方法不是线程安全的.这两行是问题所在

        if (queue.Count > 0) {
          var item = (T)queue.Dequeue();
        ...
        }
    

    使用同步队列仅保证单个访问是安全的.因此.Count和.Dequeue方法都不会混淆队列的内部结构.但是,想象一下两个线程同时运行这些代码行并且计数为1的队列的情况

    • Thread1:if(...) - > true
    • Thread2:if(...) - > true
    • 线程1:出队 - >成功
    • Thread2:dequeue - >失败,因为队列为空
  4. Worker和Proceed之间存在竞争条件,可能导致死锁.应切换以下两行代码.

    码:

        res_thr.Set()
        Interlocked.Decrement(ref Num_Of_Threads);

    第一行将取消阻止Worker方法.如果它运行得足够快,它将返回到外观,注意Num_Of_Threads <maxThreads并直接返回res_thr.WaitOne().如果当前没有其他线程正在运行,那么这将导致代码中出现死锁.使用较少的最大线程数(例如1)非常容易命中.反转这两行代码应该可以解决问题.

  5. maxThread计数属性在4之后似乎没有用.sem对象被初始化为仅接受4个最大并发条目.实际执行项目的所有代码都必须通过此信号量.因此,无论maxThread设置多高,您都有效地将最大并发项数限制为4.