nos*_*tio 14 .net c# asynchronous task-parallel-library async-await
我有以下场景,我认为这可能很常见:
有一个任务(一个UI命令处理程序)可以同步或异步完成.
命令的到达速度可能比处理它们的速度快.
如果命令已有待处理任务,则应对新命令处理程序任务进行排队并按顺序处理.
每个新任务的结果可能取决于前一个任务的结果.
应该遵守取消,但为了简单起见,我想将其排除在本问题的范围之外.此外,线程安全(并发)不是必需的,但必须支持重入.
这是我想要实现的基本示例(作为控制台应用程序,为简单起见):
using System;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var asyncOp = new AsyncOp<int>();
Func<int, Task<int>> handleAsync = async (arg) =>
{
Console.WriteLine("this task arg: " + arg);
//await Task.Delay(arg); // make it async
return await Task.FromResult(arg); // sync
};
Console.WriteLine("Test #1...");
asyncOp.RunAsync(() => handleAsync(1000));
asyncOp.RunAsync(() => handleAsync(900));
asyncOp.RunAsync(() => handleAsync(800));
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to continue to test #2...");
Console.ReadLine();
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to exit...");
Console.ReadLine();
}
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
_pending = wrapper();
return _pending;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
它按照要求工作,直到在测试#2中引入重新引入:
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
Run Code Online (Sandbox Code Playgroud)
所需的输出应该是100,200而不是200,100因为有已经是一个悬而未决外任务100.这显然是因为内部任务同步执行,打破var pending = _pending; /* ... */ _pending = wrapper()了外部任务的逻辑.
如何使其适用于测试#2?
一种解决方案是为每个任务强制执行异步Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext().但是,我不想对可能在内部同步的命令处理程序强加异步执行.此外,我不想依赖于任何特定同步上下文的行为(即依赖于它Task.Factory.StartNew应该在创建的任务实际启动之前返回).
在现实生活中,我负责AsyncOp上面的内容,但无法控制命令处理程序(即内部的任何内容handleAsync).
nos*_*tio 12
我几乎忘记了可以Task手动构建,而无需启动或安排它.然后,"Task.Factory.StartNew"vs"new Task(...).Start"让我回到正轨.我认为这是Task<TResult>构造函数实际上可能有用的少数情况之一,以及嵌套的任务(Task<Task<T>>)和Task.Unwrap():
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
如果需要,现在AsyncOp通过添加lock保护来实现线程安全也很容易_pending.
更新,下面是此模式的最新版本,它使用Task并且是线程安全的:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
Run Code Online (Sandbox Code Playgroud)