在Windows服务中以低于正常优先级停止Parallel.ForEach

Dia*_*cus 9 c# windows-services task-parallel-library

Parallel.ForEach我的Windows服务中有代码.如果ParallelOptions.MaxDegreeOfParallelism设置为-1,我正在使用我的大部分CPU.然而,停止服务持续半分钟.应该接收服务应该停止的信号的某些内部控制器线程会超出处理器时间.我将进程优先级设置为低于正常值,但这可能是不相关的信息.

即使所有线程都忙,我还能做些什么来缩短停止服务的时间?

我正在试图暂时降低线程池中线程的优先级,因为我没有任何异步代码,但Internet说这是一个坏主意,所以在这里要求"正确"的方式.

螺纹(包括操作系统和.NET)在所有情况下之间的不同OnStartOnStop.此外,如果停止时间非常长,那么OnStop有时最终将调用的OS线程是一个新线程,而不是在日志中更早显示.

要构建此代码,请创建新的Windows服务项目,从设计器添加ProjectInstaller类,将Account更改为LocalService,然后使用InstallUtil安装一次.确保LocalService可以写入C:\ Temp.

public partial class Service1 : ServiceBase
{
    private ManualResetEvent stopEvent = new ManualResetEvent(false);
    private Task mainTask;
    private StreamWriter writer = File.AppendText(@"C:\Temp\Log.txt");

    public Service1()
    {
        InitializeComponent();

        writer.AutoFlush = true;
    }

    protected override void OnStart(string[] args)
    {
        Log("--------------");
        Log("OnStart");

        mainTask = Task.Run(new Action(Run));
    }

    protected override void OnStop()
    {
        Log("OnStop");
        stopEvent.Set();

        mainTask.Wait();
        Log("--------------");
    }

    private void Log(string line)
    {
        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
    }

    private void Run()
    {
        try
        {
            using (var sha = SHA256.Create())
            {
                var parallelOptions = new ParallelOptions();
                parallelOptions.MaxDegreeOfParallelism = -1;

                Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                    parallelOptions, (fileName, parallelLoopState) =>
                {
                    if (stopEvent.WaitOne(0))
                    {
                        Log("Stop requested");
                        parallelLoopState.Stop();
                        return;
                    }

                    try
                    {
                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
                    }
                    catch (Exception ex)
                    {
                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
                    }
                });
            }
        }
        catch (Exception ex)
        {
            Log(String.Format("exception={0}", ex.Message));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Eri*_*let 2

这是一个工作代码。它立即停止。请注意,主要思想来自:SylF。

但我无法给出一个明确的解释为什么会发生这种情况......更新(在下面的评论之后):您找到了原因,并且它很好地解释了为什么您有这种行为。谢谢!我真的很高兴知道。

尽管这项工作是在低优先级线程中完成的,但在 CPU 几乎没有工作的机器上,您不应该注意到任何额外的延迟。

抱歉,我混淆了您的代码示例以实现一些测试。但主要的想法是改变调度器(似乎不推荐)。但这是我找到的唯一方法。

代码:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StackOverflowQuestionWindowsService1
{
    public partial class Service1 : ServiceBase
    {
        private ManualResetEvent stopEvent = new ManualResetEvent(false);
        private Task mainTask;
        private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt");     //TAKE CARE - I do not append anymore  ********
        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private int count = 0;

        public Service1()
        {
            InitializeComponent();

            writer.AutoFlush = true;
        }

        protected override void OnStart(string[] args)
        {
            Log("--------------");
            Log("OnStart");

            Task.Run(()=>Run());
        }

        protected override void OnStop()
        {
            Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);

            cancellationTokenSource.Cancel();
        }

        private void Log(string line)
        {
            writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
                DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
        }

        private void Run()
        {
            Stopwatch stopWatchTotal = new Stopwatch();
            stopWatchTotal.Start();

            try
            {
                using (var sha = SHA256.Create())
                {
                    var parallelOptions = new ParallelOptions();
                    parallelOptions.MaxDegreeOfParallelism = -1;
                    parallelOptions.CancellationToken = cancellationTokenSource.Token;
                    parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);

                    Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                        parallelOptions, (fileName, parallelLoopState) =>
                        {
                            // Thread.CurrentThread.Priority = ThreadPriority.Lowest;
                            Stopwatch stopWatch = new Stopwatch();
                            stopWatch.Start();

                            Interlocked.Increment(ref count);

                            if (parallelOptions.CancellationToken.IsCancellationRequested)
                            {
                                Log(String.Format($"{count}"));
                                return;
                            }

                            try
                            {
                                var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                                stopWatch.Stop();
                                Log(FormatTicks(stopWatch.ElapsedTicks));
                                Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));
                            }
                            catch (Exception ex)
                            {
                                Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));
                            }
                        });
                }
            }
            catch (Exception ex)
            {
                Log(String.Format("exception={0}", ex.Message));
            }

            stopWatchTotal.Stop();

            Log(FormatTicks(stopWatchTotal.ElapsedTicks));

            writer.Close();
            Process.GetCurrentProcess().Kill();
        }

        private string FormatTicks(long ticks)
        {
            return new TimeSpan(ticks).ToString();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

优先级调度程序:(感谢StackOverflow上的 Roman Starkov ,来自Microsoft的 Bnaya Eshet )

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StackOverflowQuestionWindowsService1
{
    public class PriorityScheduler : TaskScheduler
    {
        public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
        public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
        public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);

        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
        private Thread[] _threads;
        private ThreadPriority _priority;
        private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);

        public PriorityScheduler(ThreadPriority priority)
        {
            _priority = priority;
        }

        public override int MaximumConcurrencyLevel
        {
            get { return _maximumConcurrencyLevel; }
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);

            if (_threads == null)
            {
                _threads = new Thread[_maximumConcurrencyLevel];
                for (int i = 0; i < _threads.Length; i++)
                {
                    int local = i;
                    _threads[i] = new Thread(() =>
                    {
                        foreach (Task t in _tasks.GetConsumingEnumerable())
                            base.TryExecuteTask(t);
                    });
                    _threads[i].Name = string.Format("PriorityScheduler: ", i);
                    _threads[i].Priority = _priority;
                    _threads[i].IsBackground = true;
                    _threads[i].Start();
                }
            }
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false; // we might not want to execute task that should schedule as high or low priority inline
        }
    }
}
Run Code Online (Sandbox Code Playgroud)