Lor*_*tté 8 .net c# multithreading task-parallel-library
一些背景知识:我的C#代码调用了一些阻塞等待的非托管代码(C++).阻塞等待,但是,可报警(像Thread.Sleep-我想它调用WaitForSingleObjectEx与bAlertable TRUE下盖); 我肯定知道它是警觉的,因为它可以被"唤醒" QueueUserAPC.
如果我可以简单地使用托管线程,我只会调用阻塞方法,然后Thread.Interrupt在需要它时退出时使用"唤醒"线程; 这样的事情:
void ThreadFunc() {
try {
Message message;
comObject.GetMessage(out message);
//....
}
catch (ThreadInterruptedException) {
// We need to exit
return;
}
}
var t - new Thread(ThreadFunc);
//....
t.Interrupt();
Run Code Online (Sandbox Code Playgroud)
(注意:我没有使用这个代码,但据我所知,它可能适用于这种特殊情况(在我的控制之外的非托管代码中可警告等待).我正在寻找的是最好的相当于(或更好的替代!)在TPL中的这个.
但是我必须使用TPL(任务而不是托管的线程),并且非托管方法不受我的控制(WaitForMultipleObjectEx例如,当我向事件发出信号时,我无法修改它以调用它并使其返回).
我正在寻找一个Thread.Interrupt等价的任务(将在底层线程上发布APC的东西).AFAIK,CancellationTokens要求代码是"任务感知",并且不要使用这种技术,但我不确定:发生了什么,我想知道,如果任务执行了Thread.Sleep(我知道有一个Task.Wait,但它只是为了拥有一个可以警告的非任务等待的例子,可以取消吗?
我的假设是错的(我的意思是,我可以只使用CT,一切都会起作用吗?但是如何?).
如果没有这样的方法......我愿意接受建议.我真的想避免混合线程和任务,或使用P/Invoke,但如果没有别的办法,我仍然希望以"最干净"的方式做到这一点(这意味着:没有粗鲁的中止,并且"Tasky":))
对于那些好奇的人,我已经"确认"Thread.Interrupt可以在我的情况下工作,因为它调用QueueUserAPC.InterruptInternal然后Thread::UserInterrupt,它调用AlertAPC排队.它实际上非常聪明,因为它允许您休眠/等待然后唤醒线程而无需使用其他同步原语.
我只需要找到遵循相同流程的TPL原语
目前,所有现有的生产 CLR 主机都实现一对一的托管到非托管线程映射。对于运行旧版 COM 对象的 Windows 桌面操作系统系列尤其如此。
有鉴于此,您可以使用 TPLTask.Run代替传统的线程 API,并且QueueUserAPC在触发取消令牌时仍然通过 p/invoke 进行调用,以将 COM 对象从可更改的等待状态中释放。
下面的代码展示了如何做到这一点。需要注意的一件事是,所有ThreadPool线程(包括由 启动的线程)都隐式在COM MTA 公寓Task.Run下运行。因此,COM 对象需要支持 MTA 模型,而无需隐式 COM 封送。如果不是这种情况,您可能需要使用自定义任务计划程序(如 )而不是.StaTaskSchedulerTask.Run
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
static int ComGetMessage()
{
NativeMethods.SleepEx(2000, true);
return 42;
}
static int GetMessage(CancellationToken token)
{
var apcWasCalled = false;
var gcHandle = default(GCHandle);
var apcCallback = new NativeMethods.APCProc(target =>
{
apcWasCalled = true;
gcHandle.Free();
});
var hCurThread = NativeMethods.GetCurrentThread();
var hCurProcess = NativeMethods.GetCurrentProcess();
IntPtr hThread;
if (!NativeMethods.DuplicateHandle(
hCurProcess, hCurThread, hCurProcess, out hThread,
0, false, NativeMethods.DUPLICATE_SAME_ACCESS))
{
throw new System.ComponentModel.Win32Exception(Marshal.GetLastWin32Error());
}
try
{
int result;
using (token.Register(() =>
{
gcHandle = GCHandle.Alloc(apcCallback);
NativeMethods.QueueUserAPC(apcCallback, hThread, UIntPtr.Zero);
},
useSynchronizationContext: false))
{
result = ComGetMessage();
}
Trace.WriteLine(new { apcWasCalled });
token.ThrowIfCancellationRequested();
return result;
}
finally
{
NativeMethods.CloseHandle(hThread);
}
}
static async Task TestAsync(int delay)
{
var cts = new CancellationTokenSource(delay);
try
{
var result = await Task.Run(() => GetMessage(cts.Token));
Console.WriteLine(new { result });
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled.");
}
}
static void Main(string[] args)
{
TestAsync(3000).Wait();
TestAsync(1000).Wait();
}
static class NativeMethods
{
public delegate void APCProc(UIntPtr dwParam);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint SleepEx(uint dwMilliseconds, bool bAlertable);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint QueueUserAPC(APCProc pfnAPC, IntPtr hThread, UIntPtr dwData);
[DllImport("kernel32.dll")]
public static extern IntPtr GetCurrentThread();
[DllImport("kernel32.dll")]
public static extern IntPtr GetCurrentProcess();
[DllImport("kernel32.dll", SetLastError = true)]
public static extern bool CloseHandle(IntPtr handle);
public const uint DUPLICATE_SAME_ACCESS = 2;
[DllImport("kernel32.dll", SetLastError = true)]
public static extern bool DuplicateHandle(IntPtr hSourceProcessHandle,
IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle,
uint dwDesiredAccess, bool bInheritHandle, uint dwOptions);
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
642 次 |
| 最近记录: |