Monitor.Pulse/Wait的异步版本

Che*_*oXL 5 c# multithreading asynchronous

我正在尝试优化类似(基本功能)的异步版本到Monitor.Wait和Monitor.Pulse方法.这个想法是通过异步方法使用它.

要求:1)我有一个任务正在运行,它负责等待有人用来监视我的显示器.2)该任务可以计算复杂(即:耗时)操作.同时,脉冲方法可以多次调用而不做任何事情(因为主要任务已经在进行一些处理).3)主任务完成后,它再次开始等待,直到另一个脉冲进入.

最糟糕的情况是Wait> Pulse> Wait> Pulse> Wait ...,但通常每次等待都会有十分之几/几百个脉冲.

所以,我有以下课程(工作,但我认为它可以根据我的要求进行一些优化)

internal sealed class Awaiter
{
    private readonly ConcurrentQueue<TaskCompletionSource<byte>> _waiting = new ConcurrentQueue<TaskCompletionSource<byte>>();

    public void Pulse()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryDequeue(out tcs))
        {
            tcs.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryPeek(out tcs))
        {
            return tcs.Task;
        }

        tcs = new TaskCompletionSource<byte>();
        _waiting.Enqueue(tcs);
        return tcs.Task;
    }
}
Run Code Online (Sandbox Code Playgroud)

上述课程的问题是我用来同步的行李.因为我将从一个且只有一个线程等待,所以实际上不需要ConcurrentQueue,因为我总是只有一个项目.

所以,我简化了一下并写了以下内容:

internal sealed class Awaiter2
{
    private readonly object _mutex = new object();
    private TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w == null)
            {
                return;
            }

            _waiting = null;
            w.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        var w = _waiting;
        if (w != null)
        {
            return w.Task;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w != null)
            {
                return w.Task;
            }

            w = _waiting = new TaskCompletionSource<byte>();
            return w.Task;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

新版本也正常工作,但我仍然认为可以通过删除锁来进一步优化它.

我正在寻找有关如何优化第二版的建议.有任何想法吗?

Zar*_*rat 5

如果您不需要Wait()返回 a 的调用,Task但满足于能够返回 a,await Wait()那么您可以实现自定义等待者/可等待。

有关编译器使用的等待模式的概述,请参阅此链接。

当实现自定义等待时,您将只与委托打交道,而实际的“等待”则由您决定。当您想要“等待”某个条件时,通常可以保留待处理延续的列表,并且只要条件满足,您就可以调用这些延续。await您只需要处理来自可以从任意线程调用的事实的同步。如果您知道您只能await从一个线程(例如 UI 线程)进行操作,那么您根本不需要任何同步!

我将尝试为您提供无锁实现,但不保证它是正确的。如果您不明白为什么所有竞争条件都是安全的,则不应使用它并使用锁定语句或您知道如何调试的其他技术来实现异步/等待协议。

public sealed class AsyncMonitor
{
    private PulseAwaitable _currentWaiter;

    public AsyncMonitor()
    {
        _currentWaiter = new PulseAwaitable();
    }

    public void Pulse()
    {
        // Optimize for the case when calling Pulse() when nobody is waiting.
        //
        // This has an inherent race condition when calling Pulse() and Wait()
        // at the same time. The question this was written for did not specify
        // how to resolve this, so it is a valid answer to tolerate either
        // result and just allow the race condition.
        //
        if (_currentWaiter.HasWaitingContinuations)
            Interlocked.Exchange(ref _currentWaiter, new PulseAwaitable()).Complete();
    }

    public PulseAwaitable Wait()
    {
        return _currentWaiter;
    }
}

// This class maintains a list of waiting continuations to be executed when
// the owning AsyncMonitor is pulsed.
public sealed class PulseAwaitable : INotifyCompletion
{
    // List of pending 'await' delegates.
    private Action _pendingContinuations;

    // Flag whether we have been pulsed. This is the primary variable
    // around which we build the lock free synchronization.
    private int _pulsed;

    // AsyncMonitor creates instances as required.
    internal PulseAwaitable()
    {
    }

    // This check has a race condition which is tolerated.
    // It is used to optimize for cases when the PulseAwaitable has no waiters.
    internal bool HasWaitingContinuations
    {
        get { return Volatile.Read(ref _pendingContinuations) != null; }
    }

    // Called by the AsyncMonitor when it is pulsed.
    internal void Complete()
    {
        // Set pulsed flag first because that is the variable around which
        // we build the lock free protocol. Everything else this method does
        // is free to have race conditions.
        Interlocked.Exchange(ref _pulsed, 1);

        // Execute pending continuations. This is free to race with calls
        // of OnCompleted seeing the pulsed flag first.
        Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    #region Awaitable

    // There is no need to separate the awaiter from the awaitable
    // so we use one class to implement both parts of the protocol.
    public PulseAwaitable GetAwaiter()
    {
        return this;
    }

    #endregion

    #region Awaiter

    public bool IsCompleted
    {
        // The return value of this property does not need to be up to date so we could omit the 'Volatile.Read' if we wanted to.
        // What is not allowed is returning "true" even if we are not completed, but this cannot happen since we never transist back to incompleted.
        get { return Volatile.Read(ref _pulsed) == 1; }
    }

    public void OnCompleted(Action continuation)
    {
        // Protected against manual invocations. The compiler-generated code never passes null so you can remove this check in release builds if you want to.
        if (continuation == null)
            throw new ArgumentNullException(nameof(continuation));

        // Standard pattern of maintaining a lock free immutable variable: read-modify-write cycle.
        // See for example here: https://blogs.msdn.microsoft.com/oldnewthing/20140516-00/?p=973
        // Again the 'Volatile.Read' is not really needed since outdated values will be detected at the first iteration.
        var oldContinuations = Volatile.Read(ref _pendingContinuations);
        for (;;)
        {
            var newContinuations = (oldContinuations + continuation);
            var actualContinuations = Interlocked.CompareExchange(ref _pendingContinuations, newContinuations, oldContinuations);
            if (actualContinuations == oldContinuations)
                break;

            oldContinuations = actualContinuations;
        }

        // Now comes the interesting part where the actual lock free synchronization happens.
        // If we are completed then somebody needs to clean up remaining continuations.
        // This happens last so the first part of the method can race with pulsing us.
        if (IsCompleted)
            Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    public void GetResult()
    {
        // This is just to check against manual calls. The compiler will never call this when IsCompleted is false.
        // (Assuming your OnCompleted implementation is bug-free and you don't execute continuations before IsCompleted becomes true.)
        if (!IsCompleted)
            throw new NotSupportedException("Synchronous waits are not supported. Use 'await' or OnCompleted to wait asynchronously");
    }

    #endregion
}
Run Code Online (Sandbox Code Playgroud)

您通常不必担心延续在哪个线程上运行,因为如果它们是异步方法,编译器已经插入代码(在延续中)以切换回正确的线程,无需在每个可等待的实现中手动执行此操作。

[编辑]

作为锁定实现的起点,我将提供一个使用锁定语句的实现。用自旋锁或其他锁定技术替换它应该很容易。通过使用结构体作为等待对象,它甚至具有除了初始对象之外不进行额外分配的优点。(当然,调用方的编译器魔法中的 async/await 框架中有分配,但您无法摆脱这些。)

请注意,迭代计数器只会针对每个等待+脉冲对递增,并最终溢出为负值,但这没关系。我们只需要桥接从被调用的延续到它可以调用 GetResult 的时间。40 亿个 Wait+Pulse 对应该足以让任何挂起的延续调用其 GetResult 方法。如果您不想冒这种风险,您可以使用 long 或 Guid 来实现更独特的迭代计数器,但恕我直言, int 几乎适用于所有场景。

public sealed class AsyncMonitor
{
    public struct Awaitable : INotifyCompletion
    {
        // We use a struct to avoid allocations. Note that this means the compiler will copy
        // the struct around in the calling code when doing 'await', so for your own debugging
        // sanity make all variables readonly.
        private readonly AsyncMonitor _monitor;
        private readonly int _iteration;

        public Awaitable(AsyncMonitor monitor)
        {
            lock (monitor)
            {
                _monitor = monitor;
                _iteration = monitor._iteration;
            }
        }

        public Awaitable GetAwaiter()
        {
            return this;
        }

        public bool IsCompleted
        {
            get
            {
                // We use the iteration counter as an indicator when we should be complete.
                lock (_monitor)
                {
                    return _monitor._iteration != _iteration;
                }
            }
        }

        public void OnCompleted(Action continuation)
        {
            // The compiler never passes null, but someone may call it manually.
            if (continuation == null)
                throw new ArgumentNullException(nameof(continuation));

            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                {
                    _monitor._waiting += continuation;

                    // null the continuation to indicate the following code
                    // that we completed and don't want it executed.
                    continuation = null;
                }
            }

            // If we were already completed then we didn't null the continuation.
            // (We should invoke the continuation outside of the lock because it
            // may want to Wait/Pulse again and we want to avoid reentrancy issues.)
            continuation?.Invoke();
        }

        public void GetResult()
        {
            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                    throw new NotSupportedException("Synchronous wait is not supported. Use await or OnCompleted.");
            }
        }
    }

    private Action _waiting;
    private int _iteration;

    public AsyncMonitor()
    {
    }

    public void Pulse(bool executeAsync)
    {
        Action execute = null;

        lock (this)
        {
            // If nobody is waiting we don't need to increment the iteration counter.
            if (_waiting != null)
            {
                _iteration++;
                execute = _waiting;
                _waiting = null;
            }
        }

        // Important: execute the callbacks outside the lock because they might Pulse or Wait again.
        if (execute != null)
        {
            // If the caller doesn't want inlined execution (maybe he holds a lock)
            // then execute it on the thread pool.
            if (executeAsync)
                Task.Run(execute);
            else
                execute();
        }
    }

    public Awaitable Wait()
    {
        return new Awaitable(this);
    }
}
Run Code Online (Sandbox Code Playgroud)


Sco*_*ain 1

因为您只有一项任务正在等待,所以您的函数可以简化为

internal sealed class Awaiter3
{
    private volatile TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }
        _waiting = null;
#if NET_46_OR_GREATER
        w.TrySetResult(1);
#else
        Task.Run(() => w.TrySetResult(1));
#endif

    }

    //This method is not thread safe and can only be called by one thread at a time.
    // To make it thread safe put a lock around the null check and the assignment,
    // you do not need to have a lock on Pulse, "volatile" takes care of that side.
    public Task Wait()
    {
        if(_waiting != null)
            throw new InvalidOperationException("Only one waiter is allowed to exist at a time!");

#if NET_46_OR_GREATER
        _waiting = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
#else
        _waiting = new TaskCompletionSource<byte>();
#endif
        return _waiting.Task;
    }
}
Run Code Online (Sandbox Code Playgroud)

我确实改变了一种行为。如果您使用的是 .NET 4.6 或更高版本,请使用块中的代码#if NET_46_OR_GREATER,如果低于则使用 else 块。当您调用时,TrySetResult您可以同步运行延续,这可能会导致Pulse()需要很长时间才能完成。通过TaskCreationOptions.RunContinuationsAsynchronously在 .NET 4.6 中使用或将 4.6 之前的版本包装TrySetResult在 a 中Task.Run,将确保Puse()任务的继续不会被阻止。

请参阅 SO 问题在编译时检测目标框架版本,了解如何创建NET_46_OR_GREATER适用于代码的定义。