如何使用await在自定义TaskScheduler上运行任务?

Ste*_*oix 38 .net c# async-await

我有一些方法Task<T>可以随意返回await.我想让这些任务在自定义TaskScheduler而不是默认的上执行.

var task = GetTaskAsync ();
await task;
Run Code Online (Sandbox Code Playgroud)

我知道我可以创建一个新的TaskFactory (new CustomScheduler ())StartNew ()从中做一个,但是StartNew ()需要一个动作并创建它Task,我已经拥有了Task(在幕后由a返回TaskCompletionSource)

我怎样才能指定我自己TaskSchedulerawait

Ste*_*ary 41

我认为你真正想要的是做一个Task.Run,但使用自定义调度程序.StartNew使用异步方法无法直观地工作; 斯蒂芬Toub有一个伟大的博客文章之间的差异Task.RunTaskFactory.StartNew.

因此,要创建自己的自定义Run,您可以执行以下操作:

private static readonly TaskFactory myTaskFactory = new TaskFactory(
    CancellationToken.None, TaskCreationOptions.DenyChildAttach,
    TaskContinuationOptions.None, new MyTaskScheduler());
private static Task RunOnMyScheduler(Func<Task> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task RunOnMyScheduler(Action func)
{
  return myTaskFactory.StartNew(func);
}
private static Task<T> RunOnMyScheduler<T>(Func<T> func)
{
  return myTaskFactory.StartNew(func);
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以在自定义调度程序上执行同步异步方法.

  • 我很抱歉地说,但是实际执行任务的时间安排在默认的“ ThreadPoolTask​​Scheduler”上,您的“ MyTaskScheduler”仅用于安排内部任务。 (2认同)
  • @springy76:不,这是不正确的。`MyTaskScheduler` 将运行 `func`,并且在异步 `func` 的情况下,默认情况下任何 `await` 都将在 `MyTaskScheduler` 上恢复。 (2认同)
  • 我以“LimitedConcurrencyLevelTask​​Scheduler”为例,将其修改为 LIFO。但是所有任务都按 FIFO 顺序执行并且不受限制,但 `TaskScheduler.Current` 在任何可能的调试点返回我的自定义调度程序。当与 `Parallel.ForEach` 一起使用或使用 `myTaskFactory.StartNew()` 执行真正的 `Func&lt;TResult&gt;` 时,调度程序按预期工作 - 但`Func&lt;Task&lt;TResult&gt;&gt;` 和 `Func&lt;Task&lt;TResult&gt;&gt;` 的组合Unwrap()` 在第一个 `await` 之前甚至没有以正确的顺序和有限的并发执行代码,说调度程序被完全忽略了。 (2认同)
  • @springy76:嗯,我希望有一点奇怪 - 具体来说,发送到线程调度程序的异步方法在运行时“离开” FIFO 队列,当它们准备好在“await”之后恢复时,它们必须重新进入 FIFO 队列(在末尾)。因此,行为不会很明显,但它肯定应该使用该任务调度程序来运行它们。 (2认同)

Ada*_*son 9

TaskCompletionSource<T>.Task构造无任何动作和调度分配在第一次调用ContinueWith(...)(从异步编程与反应框架和任务并行库-第3部分).

值得庆幸的是,您可以通过实现自己的类派生来稍微自定义await行为INotifyCompletion,然后以类似于await SomeTask.ConfigureAwait(false)配置任务应该在OnCompleted(Action continuation)方法中开始使用的调度程序的模式中使用它(来自等待任何事情;).

这是用法:

    TaskCompletionSource<object> source = new TaskCompletionSource<object>();

    public async Task Foo() {
        // Force await to schedule the task on the supplied scheduler
        await SomeAsyncTask().ConfigureScheduler(scheduler);
    }

    public Task SomeAsyncTask() { return source.Task; }
Run Code Online (Sandbox Code Playgroud)

以下是ConfigureScheduler使用Task扩展方法的简单实现,其中重要部分包括OnCompleted:

public static class TaskExtension {
    public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
        return new CustomTaskAwaitable(task, scheduler);
    }
}

public struct CustomTaskAwaitable {
    CustomTaskAwaiter awaitable;

    public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
        awaitable = new CustomTaskAwaiter(task, scheduler);
    }

    public CustomTaskAwaiter GetAwaiter() { return awaitable; }

    public struct CustomTaskAwaiter : INotifyCompletion {
        Task task;
        TaskScheduler scheduler;

        public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
            this.task = task;
            this.scheduler = scheduler;
        }

        public void OnCompleted(Action continuation) {
            // ContinueWith sets the scheduler to use for the continuation action
            task.ContinueWith(x => continuation(), scheduler);
        }

        public bool IsCompleted { get { return task.IsCompleted; } }
        public void GetResult() { }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个可以作为控制台应用程序编译的工作示例:

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Example {
    class Program {
        static TaskCompletionSource<object> source = new TaskCompletionSource<object>();
        static TaskScheduler scheduler = new CustomTaskScheduler();

        static void Main(string[] args) {
            Console.WriteLine("Main Started");
            var task = Foo();
            Console.WriteLine("Main Continue ");
            // Continue Foo() using CustomTaskScheduler
            source.SetResult(null);
            Console.WriteLine("Main Finished");
        }

        public static async Task Foo() {
            Console.WriteLine("Foo Started");
            // Force await to schedule the task on the supplied scheduler
            await SomeAsyncTask().ConfigureScheduler(scheduler);
            Console.WriteLine("Foo Finished");
        }

        public static Task SomeAsyncTask() { return source.Task; }
    }

    public struct CustomTaskAwaitable {
        CustomTaskAwaiter awaitable;

        public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
            awaitable = new CustomTaskAwaiter(task, scheduler);
        }

        public CustomTaskAwaiter GetAwaiter() { return awaitable; }

        public struct CustomTaskAwaiter : INotifyCompletion {
            Task task;
            TaskScheduler scheduler;

            public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
                this.task = task;
                this.scheduler = scheduler;
            }

            public void OnCompleted(Action continuation) {
                // ContinueWith sets the scheduler to use for the continuation action
                task.ContinueWith(x => continuation(), scheduler);
            }

            public bool IsCompleted { get { return task.IsCompleted; } }
            public void GetResult() { }
        }
    }

    public static class TaskExtension {
        public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
            return new CustomTaskAwaitable(task, scheduler);
        }
    }

    public class CustomTaskScheduler : TaskScheduler {
        protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }
        protected override void QueueTask(Task task) {
            TryExecuteTask(task);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 尽管*可能*这样做,但在生产代码中通常不是一个好主意.最初的CTP有一个`SwitchTo`操作符,其行为方式大致相同,但由于它鼓励非结构化程序流程而被删除,这反过来又导致了复杂性和陷阱.例如,如果在`try`中使用修改后的`await`,那么你的`finally`块必须能够在原始调度程序或修改后的调度程序上执行(并且`finally`代码不能切换调度程序,因为`await`是不允许的). (4认同)

The*_*ias 7

无法将丰富的异步功能嵌入到自定义TaskScheduler. 这个类的设计没有考虑async/ await。使用自定义的标准方法TaskScheduler是作为Task.Factory.StartNew方法的参数。此方法不理解异步委托。可以提供异步委托,但它被视为返回某些结果的任何其他委托。要获得异步委托的实际等待结果,必须调用Unwrap()返回的任务。

不过这不是问题。问题在于TaskScheduler基础架构不会将异步委托视为单个工作单元。每个任务被拆分成多个小任务(使用每个await作为分隔符),每个小任务单独处理。这严重限制了可以在此类之上实现的异步功能。作为一个例子,这里是一个习惯TaskScheduler,旨在一次排队一个提供的任务(换句话说,限制并发):

public class MyTaskScheduler : TaskScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

    protected async override void QueueTask(Task task)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() => base.TryExecuteTask(task));
            await task;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued) => false;

    protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}
Run Code Online (Sandbox Code Playgroud)

SemaphoreSlim应确保只有一个Task会同时运行。不幸的是它不起作用。信号量过早释放,因为Task传入的调用QueueTask(task)不是代表异步委托全部工作的任务,而只是第一个await. 其他部分传递给TryExecuteTaskInline方法。无法关联这些任务部分,因为没有提供标识符或其他机制。以下是实践中发生的事情:

var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)

输出:

05:29:58.346项目1入门
05:29:58.358项目2入门
05:29:58.358项目3入门
05:29:58.358项目4入门
05:29:58.358项目5入门
05:29:59.358项目1成品
05: 29:59.374 项目 5 已完成
05:29:59.374 项目 4 已完成
05:29:59.374 项目 2 已完成
05:29:59.374 项目 3 已完成

灾难,所有任务同时排队。

结论:TaskScheduler当需要高级异步功能时,自定义类不是要走的路。

更新:这是另一个观察,关于TaskScheduler在环境中存在的custom SynchronizationContext。该await机制默认捕获 currentSynchronizationContext或 current TaskScheduler,并在捕获的上下文或调度程序上调用延续。如果两者都存在,SynchronizationContext则首选电流,而TaskScheduler忽略电流。下面是这种行为的演示,在 WinForms 应用程序中¹:

private async void Button1_Click(object sender, EventArgs e)
{
    await Task.Factory.StartNew(async () =>
    {
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
        await Task.Delay(1000);
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
    }, default, TaskCreationOptions.None,
        TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)

单击该按钮会依次弹出两条消息,其中包含以下信息:

1、System.Threading.Tasks.SynchronizationContextTaskScheduler

1、System.Threading.Tasks.ThreadPoolTask​​Scheduler

这个实验表明,只有异步委托的第一部分,第一个之前的部分await,被调度到非默认调度器上。这种行为进一步限制了 custom TaskSchedulers 在启用 async/await 的环境中的实际用途。

¹ Windows 窗体应用程序会在调用WindowsFormsSynchronizationContextApplication.Run方法时自动安装。


san*_*ole 4

在注释之后,您似乎想要控制等待后运行代码的调度程序。

默认情况下,编译会从在当前 SynchronizationContext 上运行的等待创建延续。因此,最好的办法是在调用之前设置SynchronizationContext等待。

有一些方法可以等待特定的上下文。有关如何实现此类内容的更多信息,请参阅Jon Skeet 的配置等待,特别是有关 SwitchTo 的部分。

编辑:TaskEx 中的 SwitchTo 方法已被删除,因为它太容易被误用。有关原因,请参阅MSDN 论坛。