Ste*_*oix 38 .net c# async-await
我有一些方法Task<T>
可以随意返回await
.我想让这些任务在自定义TaskScheduler
而不是默认的上执行.
var task = GetTaskAsync ();
await task;
Run Code Online (Sandbox Code Playgroud)
我知道我可以创建一个新的TaskFactory (new CustomScheduler ())
并StartNew ()
从中做一个,但是StartNew ()
需要一个动作并创建它Task
,我已经拥有了Task
(在幕后由a返回TaskCompletionSource
)
我怎样才能指定我自己TaskScheduler
的await
?
Ste*_*ary 41
我认为你真正想要的是做一个Task.Run
,但使用自定义调度程序.StartNew
使用异步方法无法直观地工作; 斯蒂芬Toub有一个伟大的博客文章之间的差异Task.Run
和TaskFactory.StartNew
.
因此,要创建自己的自定义Run
,您可以执行以下操作:
private static readonly TaskFactory myTaskFactory = new TaskFactory(
CancellationToken.None, TaskCreationOptions.DenyChildAttach,
TaskContinuationOptions.None, new MyTaskScheduler());
private static Task RunOnMyScheduler(Func<Task> func)
{
return myTaskFactory.StartNew(func).Unwrap();
}
private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func)
{
return myTaskFactory.StartNew(func).Unwrap();
}
private static Task RunOnMyScheduler(Action func)
{
return myTaskFactory.StartNew(func);
}
private static Task<T> RunOnMyScheduler<T>(Func<T> func)
{
return myTaskFactory.StartNew(func);
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以在自定义调度程序上执行同步或异步方法.
该TaskCompletionSource<T>.Task
构造无任何动作和调度分配在第一次调用ContinueWith(...)
(从异步编程与反应框架和任务并行库-第3部分).
值得庆幸的是,您可以通过实现自己的类派生来稍微自定义await行为INotifyCompletion
,然后以类似于await SomeTask.ConfigureAwait(false)
配置任务应该在OnCompleted(Action continuation)
方法中开始使用的调度程序的模式中使用它(来自等待任何事情;).
这是用法:
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
public async Task Foo() {
// Force await to schedule the task on the supplied scheduler
await SomeAsyncTask().ConfigureScheduler(scheduler);
}
public Task SomeAsyncTask() { return source.Task; }
Run Code Online (Sandbox Code Playgroud)
以下是ConfigureScheduler
使用Task扩展方法的简单实现,其中重要部分包括OnCompleted
:
public static class TaskExtension {
public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
return new CustomTaskAwaitable(task, scheduler);
}
}
public struct CustomTaskAwaitable {
CustomTaskAwaiter awaitable;
public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
awaitable = new CustomTaskAwaiter(task, scheduler);
}
public CustomTaskAwaiter GetAwaiter() { return awaitable; }
public struct CustomTaskAwaiter : INotifyCompletion {
Task task;
TaskScheduler scheduler;
public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
this.task = task;
this.scheduler = scheduler;
}
public void OnCompleted(Action continuation) {
// ContinueWith sets the scheduler to use for the continuation action
task.ContinueWith(x => continuation(), scheduler);
}
public bool IsCompleted { get { return task.IsCompleted; } }
public void GetResult() { }
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个可以作为控制台应用程序编译的工作示例:
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Example {
class Program {
static TaskCompletionSource<object> source = new TaskCompletionSource<object>();
static TaskScheduler scheduler = new CustomTaskScheduler();
static void Main(string[] args) {
Console.WriteLine("Main Started");
var task = Foo();
Console.WriteLine("Main Continue ");
// Continue Foo() using CustomTaskScheduler
source.SetResult(null);
Console.WriteLine("Main Finished");
}
public static async Task Foo() {
Console.WriteLine("Foo Started");
// Force await to schedule the task on the supplied scheduler
await SomeAsyncTask().ConfigureScheduler(scheduler);
Console.WriteLine("Foo Finished");
}
public static Task SomeAsyncTask() { return source.Task; }
}
public struct CustomTaskAwaitable {
CustomTaskAwaiter awaitable;
public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
awaitable = new CustomTaskAwaiter(task, scheduler);
}
public CustomTaskAwaiter GetAwaiter() { return awaitable; }
public struct CustomTaskAwaiter : INotifyCompletion {
Task task;
TaskScheduler scheduler;
public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
this.task = task;
this.scheduler = scheduler;
}
public void OnCompleted(Action continuation) {
// ContinueWith sets the scheduler to use for the continuation action
task.ContinueWith(x => continuation(), scheduler);
}
public bool IsCompleted { get { return task.IsCompleted; } }
public void GetResult() { }
}
}
public static class TaskExtension {
public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
return new CustomTaskAwaitable(task, scheduler);
}
}
public class CustomTaskScheduler : TaskScheduler {
protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }
protected override void QueueTask(Task task) {
TryExecuteTask(task);
}
}
}
Run Code Online (Sandbox Code Playgroud)
无法将丰富的异步功能嵌入到自定义TaskScheduler
. 这个类的设计没有考虑async
/ await
。使用自定义的标准方法TaskScheduler
是作为Task.Factory.StartNew
方法的参数。此方法不理解异步委托。可以提供异步委托,但它被视为返回某些结果的任何其他委托。要获得异步委托的实际等待结果,必须调用Unwrap()
返回的任务。
不过这不是问题。问题在于TaskScheduler
基础架构不会将异步委托视为单个工作单元。每个任务被拆分成多个小任务(使用每个await
作为分隔符),每个小任务单独处理。这严重限制了可以在此类之上实现的异步功能。作为一个例子,这里是一个习惯TaskScheduler
,旨在一次排队一个提供的任务(换句话说,限制并发):
public class MyTaskScheduler : TaskScheduler
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
protected async override void QueueTask(Task task)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() => base.TryExecuteTask(task));
await task;
}
finally
{
_semaphore.Release();
}
}
protected override bool TryExecuteTaskInline(Task task,
bool taskWasPreviouslyQueued) => false;
protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}
Run Code Online (Sandbox Code Playgroud)
本SemaphoreSlim
应确保只有一个Task
会同时运行。不幸的是它不起作用。信号量过早释放,因为Task
传入的调用QueueTask(task)
不是代表异步委托全部工作的任务,而只是第一个await
. 其他部分传递给TryExecuteTaskInline
方法。无法关联这些任务部分,因为没有提供标识符或其他机制。以下是实践中发生的事情:
var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
await Task.Delay(1000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)
输出:
05:29:58.346项目1入门
05:29:58.358项目2入门
05:29:58.358项目3入门
05:29:58.358项目4入门
05:29:58.358项目5入门
05:29:59.358项目1成品
05: 29:59.374 项目 5 已完成
05:29:59.374 项目 4 已完成
05:29:59.374 项目 2 已完成
05:29:59.374 项目 3 已完成
灾难,所有任务同时排队。
结论:TaskScheduler
当需要高级异步功能时,自定义类不是要走的路。
更新:这是另一个观察,关于TaskScheduler
在环境中存在的custom SynchronizationContext
。该await
机制默认捕获 currentSynchronizationContext
或 current TaskScheduler
,并在捕获的上下文或调度程序上调用延续。如果两者都存在,SynchronizationContext
则首选电流,而TaskScheduler
忽略电流。下面是这种行为的演示,在 WinForms 应用程序中¹:
private async void Button1_Click(object sender, EventArgs e)
{
await Task.Factory.StartNew(async () =>
{
MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
await Task.Delay(1000);
MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
}, default, TaskCreationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)
单击该按钮会依次弹出两条消息,其中包含以下信息:
1、System.Threading.Tasks.SynchronizationContextTaskScheduler
1、System.Threading.Tasks.ThreadPoolTaskScheduler
这个实验表明,只有异步委托的第一部分,第一个之前的部分await
,被调度到非默认调度器上。这种行为进一步限制了 custom TaskScheduler
s 在启用 async/await 的环境中的实际用途。
¹ Windows 窗体应用程序会在调用WindowsFormsSynchronizationContext
该Application.Run
方法时自动安装。
在注释之后,您似乎想要控制等待后运行代码的调度程序。
默认情况下,编译会从在当前 SynchronizationContext 上运行的等待创建延续。因此,最好的办法是在调用之前设置SynchronizationContext
等待。
有一些方法可以等待特定的上下文。有关如何实现此类内容的更多信息,请参阅Jon Skeet 的配置等待,特别是有关 SwitchTo 的部分。
编辑:TaskEx 中的 SwitchTo 方法已被删除,因为它太容易被误用。有关原因,请参阅MSDN 论坛。
归档时间: |
|
查看次数: |
25504 次 |
最近记录: |