任务TPL的Thread.Interrupt等价物

Lor*_*tté 8 .net c# multithreading task-parallel-library

一些背景知识:我的C#代码调用了一些阻塞等待的非托管代码(C++).阻塞等待,但是,可报警(像Thread.Sleep-我想它调用WaitForSingleObjectExbAlertable TRUE下盖); 我肯定知道它是警觉的,因为它可以被"唤醒" QueueUserAPC.

如果我可以简单地使用托管线程,我只会调用阻塞方法,然后Thread.Interrupt在需要它时退出时使用"唤醒"线程; 这样的事情:

void ThreadFunc() {
    try {
           Message message;
           comObject.GetMessage(out message);
           //....
     }
     catch (ThreadInterruptedException) {
        // We need to exit
        return;
     }
}

var t - new Thread(ThreadFunc);
//....
t.Interrupt();
Run Code Online (Sandbox Code Playgroud)

(注意:我没有使用这个代码,但据我所知,它可能适用于这种特殊情况(在我的控制之外的非托管代码中可警告等待).我正在寻找的是最好的相当于(或更好的替代!)在TPL中的这个.

但是我必须使用TPL(任务而不是托管的线程),并且非托管方法不受我的控制(WaitForMultipleObjectEx例如,当我向事件发出信号时,我无法修改它以调用它并使其返回).

我正在寻找一个Thread.Interrupt等价的任务(将在底层线程上发布APC的东西).AFAIK,CancellationTokens要求代码是"任务感知",并且不要使用这种技术,但我不确定:发生了什么,我想知道,如果任务执行了Thread.Sleep(我知道有一个Task.Wait,但它只是为了拥有一个可以警告的非任务等待的例子,可以取消吗?

我的假设是错的(我的意思是,我可以只使用CT,一切都会起作用吗?但是如何?).

如果没有这样的方法......我愿意接受建议.我真的想避免混合线程和任务,或使用P/Invoke,但如果没有别的办法,我仍然希望以"最干净"的方式做到这一点(这意味着:没有粗鲁的中止,并且"Tasky":))

编辑:

对于那些好奇的人,我已经"确认"Thread.Interrupt可以在我的情况下工作,因为它调用QueueUserAPC.InterruptInternal然后Thread::UserInterrupt,它调用AlertAPC排队.它实际上非常聪明,因为它允许您休眠/等待然后唤醒线程而无需使用其他同步原语.

我只需要找到遵循相同流程的TPL原语

nos*_*tio 4

目前,所有现有的生产 CLR 主机都实现一对一的托管到非托管线程映射。对于运行旧版 COM 对象的 Windows 桌面操作系统系列尤其如此。

有鉴于此,您可以使用 TPLTask.Run代替传统的线程 API,并且QueueUserAPC在触发取消令牌时仍然通过 p/invoke 进行调用,以将 COM 对象从可更改的等待状态中释放。

下面的代码展示了如何做到这一点。需要注意的一件事是,所有ThreadPool线程(包括由 启动的线程)都隐式在COM MTA 公寓Task.Run下运行。因此,COM 对象需要支持 MTA 模型,而无需隐式 COM 封送。如果不是这种情况,您可能需要使用自定义任务计划程序(如 )而不是.StaTaskSchedulerTask.Run

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        static int ComGetMessage()
        {
            NativeMethods.SleepEx(2000, true);
            return 42;
        }

        static int GetMessage(CancellationToken token)
        {
            var apcWasCalled = false;
            var gcHandle = default(GCHandle);
            var apcCallback = new NativeMethods.APCProc(target => 
            {
                apcWasCalled = true;
                gcHandle.Free();
            });

            var hCurThread = NativeMethods.GetCurrentThread();
            var hCurProcess = NativeMethods.GetCurrentProcess();
            IntPtr hThread;
            if (!NativeMethods.DuplicateHandle(
                hCurProcess, hCurThread, hCurProcess, out hThread,
                0, false, NativeMethods.DUPLICATE_SAME_ACCESS))
            {
                throw new System.ComponentModel.Win32Exception(Marshal.GetLastWin32Error());
            }
            try
            {
                int result;
                using (token.Register(() => 
                    {
                        gcHandle = GCHandle.Alloc(apcCallback);
                        NativeMethods.QueueUserAPC(apcCallback, hThread, UIntPtr.Zero);
                    },
                    useSynchronizationContext: false))
                {
                    result = ComGetMessage();
                }
                Trace.WriteLine(new { apcWasCalled });
                token.ThrowIfCancellationRequested();
                return result;
            }
            finally
            {
                NativeMethods.CloseHandle(hThread);
            }
        }

        static async Task TestAsync(int delay)
        {
            var cts = new CancellationTokenSource(delay);
            try
            {
                var result = await Task.Run(() => GetMessage(cts.Token));
                Console.WriteLine(new { result });
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Cancelled.");
            }
        }

        static void Main(string[] args)
        {
            TestAsync(3000).Wait();
            TestAsync(1000).Wait();
        }

        static class NativeMethods
        {
            public delegate void APCProc(UIntPtr dwParam);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint SleepEx(uint dwMilliseconds, bool bAlertable);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint QueueUserAPC(APCProc pfnAPC, IntPtr hThread, UIntPtr dwData);

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentThread();

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentProcess();

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool CloseHandle(IntPtr handle);

            public const uint DUPLICATE_SAME_ACCESS = 2;

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool DuplicateHandle(IntPtr hSourceProcessHandle,
               IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle,
               uint dwDesiredAccess, bool bInheritHandle, uint dwOptions);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 非常好的答案,我将采用这种方法,因为取消令牌本身似乎没有发送“隐式”APC。只有一个观察结果:即使你的前提是正确的,我也会特别安全并调用 [BeginThreadAffinity](https://msdn.microsoft.com/en-us/library/system.threading.thread.beginthreadaffinity%28v=vs .110%29.aspx) 和 EndThreadAffinity (2认同)