StaTaskScheduler和STA线程消息泵送

avo*_*avo 27 .net c# com task-parallel-library async-await

TL; DR:运行任务中的死锁StaTaskScheduler.长版:

我使用的是StaTaskSchedulerParallelExtensionsExtras中通过平行小组,举办由第三方提供的一些遗留STA COM对象.StaTaskScheduler实现细节的描述如下:

好消息是TPL的实现能够在MTA或STA线程上运行,并考虑到底层API的相关差异,如WaitHandle.WaitAll(当方法提供多个等待句柄时,它只支持MTA线程).

我认为这意味着TPL的阻塞部分将使用等待API来提供消息,例如CoWaitForMultipleHandles,以避免在STA线程上调用时出现死锁情况.

在我的情况下,我相信发生以下情况:进程内STA COM对象A调用进程外对象B,然后期望从B通过回调作为传出调用的一部分.

以简化形式:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
Run Code Online (Sandbox Code Playgroud)

问题是,a.Method(b)永远不会回来.据我所知,这是因为内部阻塞等待BlockingCollection<Task>不会引发消息,因此我对引用语句的假设可能是错误的.

EDITED相同的代码工作测试WinForms应用程序的UI线程上执行时(即,提供TaskScheduler.FromCurrentSynchronizationContext()的,而不是staTaskSchedulerTask.Factory.StartNew).

解决这个问题的正确方法是什么?我应该实现一个自定义同步上下文,它将显式地使用消息CoWaitForMultipleHandles,并将其安装在每个启动的STA线程上StaTaskScheduler

如果是这样,底层实现是否BlockingCollection会调用我的SynchronizationContext.Wait方法?我可以SynchronizationContext.WaitHelper用来实现SynchronizationContext.Wait吗?


使用一些代码进行编辑,显示托管STA线程在执行阻塞等待时不会进行抽取.代码是一个完整的控制台应用程序,可以复制/粘贴/运行:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}
Run Code Online (Sandbox Code Playgroud)

这会产生输出:

Testing without pumping...
The collection argument is empty and has been marked as complete with regards to additions.

Test with pumping...
The collection argument is empty and has been marked as complete with regards to additions.
Now start pumping...
WM_TEST processed
Press Enter to exit

nos*_*tio 33

我对您的问题的理解:您StaTaskScheduler仅用于为传统的COM对象组织经典的COM STA公寓.您没有在STA的线程上运行WinForms或WPF核心消息循环StaTaskScheduler.也就是说,你没有使用任何类似的东西Application.Run,Application.DoEvents或者Dispatcher.PushFrame在那个线程中.如果这是一个错误的假设,请纠正我.

它本身StaTaskScheduler 不会在它创建的STA线程上安装任何同步上下文.因此,您依靠CLR为您提供消息.我只是发现一个隐含的确认,即CLR泵在STA线程上,在CLR中的公寓和泵浦中由Chris Brumme:

我一直说托管阻塞会在STA线程上调用时执行"一些抽取".要确切知道什么会被泵送,这不是很好吗?不幸的是,抽水是一种超越凡人理解的黑色艺术.在Win2000及以上版本中,我们只需委托OLE32的CoWaitForMultipleHandles服务.

这表明CLR在CoWaitForMultipleHandles内部用于STA线程.此外,MSDN的COWAIT_DISPATCH_WINDOW_MESSAGES标志文件提到了这一点:

...在STA中只发送了一小组特殊的消息.

我做了一些研究,但无法WM_TEST从你的示例代码中抽出来CoWaitForMultipleHandles,我们在你的问题的评论中讨论过.我的理解是,上面提到的一小组特殊情况的消息 实际上仅限于某些特定于COM编组的消息,并且不包括像你这样的任何常规通用消息WM_TEST.

那么,回答你的问题:

...我是否应该实现自定义同步上下文,它将使用CoWaitForMultipleHandles显式地泵送消息,并将其安装在由StaTaskScheduler启动的每个STA线程上?

是的,我相信创建自定义同步上下文和覆盖SynchronizationContext.Wait确实是正确的解决方案.

但是,您应该避免使用CoWaitForMultipleHandles,而是使用MsgWaitForMultipleObjectsEx.如果MsgWaitForMultipleObjectsEx指示队列中有待处理的消息,则应使用PeekMessage(PM_REMOVE)和手动将其泵出DispatchMessage.然后你应该继续等待句柄,所有这些都在同一个SynchronizationContext.Wait调用中.

请注意,和之间有一个微妙但重要的区别.如果在队列中已经看到消息(例如,带有或者),则后者不会返回并保持阻塞,但不会被删除.这对于抽水来说并不好,因为您的COM对象可能正在使用类似于检查消息队列的东西.这可能会导致在不期望时阻止.MsgWaitForMultipleObjectsExMsgWaitForMultipleObjectsPeekMessage(PM_NOREMOVE)GetQueueStatusPeekMessageMsgWaitForMultipleObjects

OTOH,MsgWaitForMultipleObjectsEx带有MWMO_INPUTAVAILABLE旗帜没有这样的缺点,并且在这种情况下会返回.

不久之前,我创建了一个自定义版本StaTaskScheduler(此处可用ThreadAffinityTaskScheduler),试图解决另一个问题:维护一个线程池,该线程池具有线程关联性,用于后续的await延续.如果跨多个使用STA COM对象,则线程关联性至关重要awaits.原始版本StaTaskScheduler仅在其池限制为1个线程时才显示此行为.

所以我继续前进,并对你的WM_TEST案子进行了更多的实验.最初,我SynchronizationContext在STA线程上安装了标准类的实例.该WM_TEST消息没有得到抽,这是预期.

然后我重写SynchronizationContext.Wait,只是转发它SynchronizationContext.WaitHelper.确实有人打电话,但仍然没有打气.

最后,我实现了一个功能齐全的消息泵循环,这是它的核心部分:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}
Run Code Online (Sandbox Code Playgroud)

这确实有效,WM_TEST被抽水了.以下是您的测试的改编版本:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}
Run Code Online (Sandbox Code Playgroud)

输出:

Initial thread #9
On STA thread #10
Post some WM_TEST messages...
Press Enter to continue...
WM_TEST processed: 1
WM_TEST processed: 2
WM_TEST processed: 3

After await, thread #10
Pending messages in the queue: False
Exiting STA thread #10
Current thread #12
Press any key to exit

请注意,此实现支持线程关联(它保留在线程#10之后await)和消息泵送.完整的源代码包含可重用的部分(ThreadAffinityTaskSchedulerThreadWithAffinityContext),可在此处作为独立的控制台应用程序使用.它尚未经过彻底测试,因此使用它需要您自担风险.

  • 非常感谢Noseratio!这是第一个可靠的方式来管理到目前为止我已经看到的COM互操作,即不容易颠覆整个编程模型的拳头.惊人! (2认同)

Han*_*ant 16

STA线程泵送的主题很大,很少有程序员有一个愉快的时间解决死锁.关于它的开创性论文是由Chris Brumme撰写的,他是一位从事.NET工作的主要聪明人.你会在这篇博文中找到它.不幸的是,它的具体细节相当短暂,他并没有超出注意到CLR做了一些抽水但没有任何关于确切规则的细节.

他正在谈论的代码,在.NET 2.0中添加,存在于名为MsgWaitHelper()的内部CLR函数中..NET 2.0的源代码可通过SSCLI20发行版获得.非常完整,但不包括MsgWaitHelper()的来源.很不寻常.反编译它是一个失败的原因,它非常大.

从他的博客文章中拿走的一件事就是重新入侵的危险.在STA线程中抽取是危险的,因为它能够分派Windows消息并在程序未处于正确状态时执行任意代码以允许执行此类代码.大多数VB6程序员在使用DoEvents()在代码中获取模态循环以停止冻结UI时所知道的东西.我写了一篇关于其最典型危险的帖子.MsgWaitHelper()这是否确切类型抽自己的,它不过是究竟非常有选择性什么样的代码允许运行.

通过在没有附加调试器的情况下运行程序然后附加非托管调试器,您可以深入了解它在测试程序中的作用.你会在NtWaitForMultipleObjects()上看到它阻塞.我更进了一步,在PeekMessageW()上设置了一个断点,以获得这个堆栈跟踪:

user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown
Run Code Online (Sandbox Code Playgroud)

请注意,我在Windows 8.1上记录了这个堆栈跟踪,它在旧版Windows上看起来会有很大不同.在Windows 8中,COM模态循环已经大量修改,它对WinRT程序来说也是一个非常大的问题.不知道那么多,但它似乎有另一个名为ASTA的STA线程模型进行更严格的抽取,包含在添加的CoWaitForMultipleObjects()中

ObjectNative :: WaitTimeout()是BlockingCollection.Take()方法中的SemaphoreSlim.Wait()开始执行CLR代码的地方.你看到它通过内部CLR代码的级别来达到神秘的MsgWaitHelper()函数,然后切换到臭名昭着的COM模态调度程序循环.

它在你的程序中执行"错误"抽样的蝙蝠信号标志是对CliModalLoop :: PeekRPCAndDDEMessage()方法的调用.换句话说,它只考虑发布到特定内部窗口的互操作消息的类型,该窗口调度跨越公寓边界的COM调用.它不会为您自己的窗口抽取消息队列中的消息.

这是可以理解的行为,Windows只能绝对确定当它可以看到你的UI线程空闲时,重新入侵不会杀死你的程序.当它泵送消息循环本身时它是空闲的,对PeekMessage()或GetMessage()的调用指示该状态.问题是,你不要自己抽水.你违反了STA线程的核心合同,它必须抽取消息循环.希望COM模态循环能为你做抽水,这是无望的希望.

你可以解决这个问题,即使我不建议你这样做.CLR将把它留给应用程序本身,以通过正确构造的SynchronizationContext.Current对象执行等待.您可以通过派生自己的类来创建一个并重写Wait()方法.调用SetWaitNotificationRequired()方法来说服CLR它应该由你决定.一个不完整的版本,演示了该方法:

class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}
Run Code Online (Sandbox Code Playgroud)

并在线程的开头安装它:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();
Run Code Online (Sandbox Code Playgroud)

您现在将看到已分派WM_TEST消息.它调用分派它的Application.DoEvents().我可以通过使用PeekMessage + DispatchMessage来掩盖它,但这会混淆这段代码的危险,最好不要在表下粘贴DoEvents().你真的在这里玩一个非常危险的重入游戏.不要使用此代码.

简而言之,正确使用StaThreadScheduler的唯一希望就是在已经实现STA合同的代码中使用它时,像STA线程那样的泵应该这样做.它真的意味着旧代码的创可贴,你不必奢侈地控制线程状态.就像在VB6程序或Office加载项中开始生活的任何代码一样.尝试一下,我不认为它实际上可以工作.值得注意的是,asych/await的可用性应该完全消除对它的需求.