让多个线程正常工作并等待所有线程完成的最佳方法是什么?

BFr*_*ree 13 c# multithreading threadpool

我正在编写一个简单的应用程序(对于我的妻子而言:-P),它可以对潜在的大批图像进行一些图像处理(调整大小,时间戳等).所以我正在编写一个可以同步和异步执行此操作的库.我决定使用基于事件的异步模式.使用此模式时,您需要在工作完成后引发事件.这是我知道什么时候完成的问题.所以基本上,在我的DownsizeAsync方法(缩小图像的异步方法)中,我正在做这样的事情:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }
Run Code Online (Sandbox Code Playgroud)

现在棘手的部分是知道什么时候它们都完整了.

这就是我所考虑的:使用像这里的ManualResetEvents:http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx但我遇到的问题是你只能等待64次或更少的事件.我可能有更多的图像.

第二个选项:有一个计数器来计算已完成的图像,并在计数达到总数时引发事件:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;
Run Code Online (Sandbox Code Playgroud)

现在这感觉"hacky",我不完全确定这是否安全.

所以,我的问题是,这样做的最佳方法是什么?有没有其他方法来同步所有线程?我不应该使用ThreadPool吗?谢谢!!

更新根据评论中的反馈和几个答案,我决定采用这种方法:

首先,我创建了一个扩展方法,将可枚举的批处理批处理为"批处理":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }
Run Code Online (Sandbox Code Playgroud)

基本上,如果你做这样的事情:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }
Run Code Online (Sandbox Code Playgroud)

你得到这个输出:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95
Run Code Online (Sandbox Code Playgroud)

这个想法(正如评论中指出的那样)没有必要为每个图像创建一个单独的线程.因此,我将图像批量转换为[machine.cores*2]批次.然后,我将使用我的第二种方法,这只是为了保持计数器的运行,当计数器达到我期望的总数时,我会知道我已经完成了.

我现在确信它实际上是线程安全的原因是因为我根据MSDN将总变量标记为volatile :

volatile修饰符通常用于多个线程访问的字段,而不使用lock语句来序列化访问.使用volatile修饰符可确保一个线程检索另一个线程写入的最新值

意思是我应该清楚(如果没有,请让我知道!)

所以这是我要使用的代码:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }
Run Code Online (Sandbox Code Playgroud)

我愿意接受反馈,因为我不是多线程专家,所以如果有人对此有任何疑问,或者有更好的想法,请告诉我.(是的,这只是一个自制的应用程序,但我对如何利用我在这里获得的知识来改进我们在工作中使用的搜索/索引服务有一些想法.)现在我将这个问题保持开放直到我我觉得我正在使用正确的方法.谢谢大家的帮助.

Jon*_*eet 11

最简单的方法是创建新线程,然后调用Thread.Join每个线程.你可以使用信号量或类似的东西 - 但是创建新线程可能更容易.

在.NET 4.0中,您可以使用Parallel Extensions轻松完成任务.

作为另一种选择使用线程池,你可以创建一个委托,并调用BeginInvoke它,返回一个IAsyncResult-那么你就可以得到WaitHandle通过为每个结果AsyncWaitHandle属性,并调用WaitHandle.WaitAll.

编辑:正如评论中所指出的,在WaitAll某些实现中,您一次最多只能调用64个句柄.备选方案可以WaitOne依次调用每个方法,也WaitAll可以批量调用.它不会真正重要,只要你从一个不会阻塞线程池的线程中做到这一点.另请注意,您无法WaitAll从STA线程调用.

  • Jon-你忘记了WaitAll的64限制. (2认同)

Mik*_*nty 11

您仍然希望使用ThreadPool,因为它将管理它同时运行的线程数.我最近遇到了类似的问题并解决了这个问题:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();
Run Code Online (Sandbox Code Playgroud)

IDispatcher和IJob看起来像这样:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}
Run Code Online (Sandbox Code Playgroud)

然后使用装饰器来填充ThreadPool的使用:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}
Run Code Online (Sandbox Code Playgroud)

IDispatcher抽象非常适合交换线程技术.我有一个SingleThreadedDispatcher的另一个实现,你可以创建像Jon Skeet建议的ThreadStart版本.然后很容易运行每一个,看看你得到了什么样的性能.调试代码或不想杀死盒子上的处理器时,SingleThreadedDispatcher很好.

编辑:我忘了为ThreadPoolWorker添加代码:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


小智 5

最简单有效的解决方案是使用计数器并使其线程安全.这将消耗更少的内存,并可以扩展到更多的线程

这是一个例子

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");
Run Code Online (Sandbox Code Playgroud)