用于单线程异步 async-await 风格编程的信号量

Joh*_*ohn 7 .net asynchronous task-parallel-library async-await

信号量是一种多线程锁定机制,可确保只有有限数量的线程在给定资源上运行。互斥体是一种特殊情况,其中有限数量为 1。

异步编程与多线程编程有很多共同点(有时也有关系),尽管它本身并不是多线程的。

以下代码创建十个任务,只需等待一秒钟并记录其开始和结束。

所有这些都仅在一个线程上执行(我假设已经进行了适当的同步上下文维护,例如 WPF 中的情况)。

因此,即使只有一个线程,我们也有“并行”任务,并且在某些用例中,人们可能希望将对资源的访问限制为仅少数或其中一个任务。(例如,限制并行网络请求。)

看来需要一个“异步信号量”——这个概念不锁定线程,而是锁定异步延续。

我已经实现了这样一个信号量来检查它是否确实有意义并阐明我的意思。

我的问题是:这个东西是否已经可用,最好是在 .NET 框架本身中可用?我找不到任何东西,尽管在我看来它应该存在。

这是代码(LINQPad 在此分享):

    async void Main()
    {
        // Necessary in LINQPad to ensure a single thread.
        // Other environments such as WPF do this for you.
        SynchronizationContext.SetSynchronizationContext(
            new DispatcherSynchronizationContext());

        var tasks = Enumerable.Range(1, 10).Select(SampleWork).ToArray();

        await Task.WhenAll(tasks);

        "All done.".Dump();
    }

    AsyncSemaphore commonSemaphore = new AsyncSemaphore(4);

    async Task SampleWork(Int32 i)
    {
        using (await commonSemaphore.Acquire())
        {
            $"Beginning work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();

            await Task.Delay(TimeSpan.FromSeconds(1));

            $"Finished work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();
        }
    }

    public class AsyncSemaphore
    {
        Int32 maxTasks;
        Int32 currentTasks;

        ReleasingDisposable release;

        Queue<TaskCompletionSource<Object>> continuations
            = new Queue<TaskCompletionSource<Object>>();

        public AsyncSemaphore(Int32 maxTasks = 1)
        {
            this.maxTasks = maxTasks;
            release = new ReleasingDisposable(this);
        }

        public async Task<IDisposable> Acquire()
        {
            ++currentTasks;

            if (currentTasks > maxTasks)
            {
                var tcs = new TaskCompletionSource<Object>();

                continuations.Enqueue(tcs);

                await tcs.Task;
            }

            return release;
        }

        void Release()
        {
            --currentTasks;

            if (continuations.Count > 0)
            {
                var tcs = continuations.Dequeue();

                tcs.SetResult(null);
            }
        }

        class ReleasingDisposable : IDisposable
        {
            AsyncSemaphore self;

            public ReleasingDisposable(AsyncSemaphore self) => this.self = self;

            public void Dispose() => self.Release();
        }
    }
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

Beginning work #1 1
Beginning work #2 1
Beginning work #3 1
Beginning work #4 1
Finished work #4 1
Finished work #3 1
Finished work #2 1
Finished work #1 1
Beginning work #5 1
Beginning work #6 1
Beginning work #7 1
Beginning work #8 1
Finished work #5 1
Beginning work #9 1
Finished work #8 1
Finished work #7 1
Finished work #6 1
Beginning work #10 1
Finished work #9 1
Finished work #10 1
All done.
Run Code Online (Sandbox Code Playgroud)

事实上,我最多有 4 个任务正在运行,并且所有任务都在同一个线程上运行。

Ste*_*ary 4

因此,即使只有一个线程,我们也有“并行”任务

我通常更喜欢“并发”一词,以避免与Parallel/并行 LINQ 混淆。

我的问题是:这个东西是否已经可用,最好是在 .NET 框架本身中可用?

是的。SemaphoreSlim是可以同步异步使用的信号量。

我还在NuGet 上拥有一整套异步协调原语,其灵感来自Stephen Toub 关于该主题的博客文章系列。我的原语都是同步和异步兼容的(并且线程安全),这在例如资源的一个用户是同步的但其他用户是异步的情况下很有用。