Parallel.ForEach使用自定义TaskScheduler来防止OutOfMemoryException

Tem*_*Tem 5 c# parallel-processing multithreading

我正在通过Parallel.ForEach处理各种大小的PDF(简单的2MB到几百MB的高DPI扫描)并且偶尔会遇到OutOfMemoryException - 可以理解的是由于进程是32位并且Parallel产生了线程. ForEach占用了大量未知的内存消耗工作.

限制MaxDegreeOfParallelism确实有效,尽管由于所述线程的内存占用量较小而导致有大量(10k +)批量的小型PDF需要处理时的吞吐量不足.这是一个CPU繁重的过程,在遇到偶尔的大型PDF组并获得OutOfMemoryException之前,Parallel.ForEach很容易达到100%的CPU.运行Performance Profiler会将其备份.

根据我的理解,为Parallel.ForEach设置分区器不会提高我的性能.

这导致我使用TaskScheduler传递给我的Parallel.ForEach 的自定义MemoryFailPoint检查.在它周围搜索似乎有关于创建自定义TaskScheduler对象的稀缺信息.

之间寻找在.NET专业任务调度4并行扩展附加功能,在C#中的自定义的TaskScheduler这里#2各种各样的回答,我已经建立了我自己的TaskScheduler,并有我的QueueTask方法,例如:

protected override void QueueTask(Task task)
{
    lock (tasks) tasks.AddLast(task);
    try
    {
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600))
        {
            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }
    }
    catch (InsufficientMemoryException e)
    {     
        // somehow return thread to pool?           
        Console.WriteLine("InsufficientMemoryException");
    }
}
Run Code Online (Sandbox Code Playgroud)

虽然try/catch有点贵,但我的目标是捕获600MB的可能最大大小PDF(+一点额外内存开销)将抛出OutOfMemoryException.当我捕获InsufficientMemoryException时,这个解决方案似乎杀掉了试图完成工作的线程.有了足够大的PDF,我的代码最终成为一个单一的线程Parallel.ForEach.

在Parallel.ForEach和OutOfMemoryExceptions上的Stackoverflow上发现的其他问题似乎不适合我在线程上使用动态内存的最大吞吐量的用例,并且通常只是MaxDegreeOfParallelism作为静态解决方案使用,例如:

因此,为了获得可变工作内存大小的最大吞吐量,可以:

  • 如果线程被拒绝通过MemoryFailPoint支票工作,我如何将线程返回到线程池中?
  • 当有空闲内存时,我如何/在哪里安全地生成新线程以重新开始工作?

编辑:由于光栅化和光栅化图像处理组件(取决于PDF内容),磁盘上的PDF大小可能不会线性表示内存中的大小.

Tem*_*Tem 1

使用.NET Framework 的并行编程示例,LimitedConcurrencyLevelTaskScheduler我可以进行一些细微的调整以获得与我想要的东西类似的东西。以下是修改后的类的方法:NotifyThreadPoolOfPendingWorkLimitedConcurrencyLevelTaskScheduler

private void NotifyThreadPoolOfPendingWork()
{
    ThreadPool.UnsafeQueueUserWorkItem(_ =>
    {
        // Note that the current thread is now processing work items.
        // This is necessary to enable inlining of tasks into this thread.
        _currentThreadIsProcessingItems = true;
        try
        {
            // Process all available items in the queue.
            while (true)
            {
                Task item;
                lock (_tasks)
                {
                    // When there are no more items to be processed,
                    // note that we're done processing, and get out.
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }

                    // Get the next item from the queue
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }

                // Execute the task we pulled out of the queue
                //base.TryExecuteTask(item);

                try
                {
                    using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650))
                    {
                        base.TryExecuteTask(item);
                    }
                }
                catch (InsufficientMemoryException e)
                {
                    Thread.Sleep(500);

                    lock (_tasks)
                    {
                        _tasks.AddLast(item);
                    }
                }

            }
        }
        // We're done processing items on the current thread
        finally { _currentThreadIsProcessingItems = false; }
    }, null);
}
Run Code Online (Sandbox Code Playgroud)

我们将看看其中的陷阱,但相反。我们将要处理的任务添加回任务列表 ( _tasks),这会触发一个事件以获取可用线程来处理该工作。但是我们首先休眠当前线程,以便它不会直接继续工作并返回到失败的MemoryFailPoint检查。