use*_*019 2 c# asynchronous task-parallel-library async-await pause
我正在使用一个任务来进行长时间运行的异步处理操作,我希望能够在任意时刻暂停和恢复。对我来说幸运的是,Microsoft 的 TPL 自己的作者之一已经提出了这个问题的解决方案。唯一的麻烦是他的解决方案不能正常工作。
当您删除await Task.Delay(100)下面代码中的 时,代码将在第一个请求之后停止接受暂停请求。如果 的值可信的话,代码似乎SomeMethodAsync会在与其他任务相同的线程上恢复执行。Thread.CurrentThread.ManagedThreadId的输出还SomeMethodAsync表明它正在多个线程上运行。
我一直发现 TPL 相当混乱且难以使用,而 async/await 更是如此,所以我很难理解这里到底发生了什么。如果有人能解释的话,我将非常感激。
极简示例代码:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PauseTokenTest {
class Program {
static void Main() {
var pts = new PauseTokenSource();
Task.Run(() =>
{
while (true) {
Console.ReadLine();
Console.WriteLine(
$"{Thread.CurrentThread.ManagedThreadId}: Pausing task");
pts.IsPaused = !pts.IsPaused;
}
});
SomeMethodAsync(pts.Token).Wait();
}
public static async Task SomeMethodAsync(PauseToken pause) {
for (int i = 0; ; i++) {
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: {i}");
// Comment this out and repeatedly pausing and resuming will no longer work.
await Task.Delay(100);
await pause.WaitWhilePausedAsync();
}
}
}
public class PauseTokenSource {
internal static readonly Task s_completedTask =
Task.FromResult(true);
volatile TaskCompletionSource<bool> m_paused;
public bool IsPaused {
get { return m_paused != null; }
set {
if (value) {
Interlocked.CompareExchange(
ref m_paused, new TaskCompletionSource<bool>(), null);
} else {
while (true) {
var tcs = m_paused;
if (tcs == null) return;
if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs) {
tcs.SetResult(true);
break;
}
}
}
}
}
public PauseToken Token { get { return new PauseToken(this); } }
internal Task WaitWhilePausedAsync() {
var cur = m_paused;
return cur != null ? cur.Task : s_completedTask;
}
}
public struct PauseToken {
readonly PauseTokenSource m_source;
internal PauseToken(PauseTokenSource source) {
m_source = source;
}
public bool IsPaused {
get { return m_source != null && m_source.IsPaused; }
}
public Task WaitWhilePausedAsync() {
return IsPaused ? m_source.WaitWhilePausedAsync() :
PauseTokenSource.s_completedTask;
}
}
}
Run Code Online (Sandbox Code Playgroud)
发生这种情况是因为任务延续的执行方式。在框架的许多地方,任务延续都会尽可能同步执行。这并不总是可能的,但它确实变得常见的一个地方是线程池线程,这些线程通常被视为可交换的。
await是使用同步延续的地方之一;据我所知,这没有在任何地方正式记录,但我在我的博客上描述了这种行为。
因此,本质上发生的事情是:SomeMethodAsync暂停(从- 没有上下文await返回的任务),它将其余部分附加为该任务的延续。然后,当(线程池)线程切换时,它完成从 返回的任务。然后,运行时查看与该延续关联的上下文(无),以便该延续被视为与任何线程池线程兼容。嘿,当前线程是线程池线程!所以它只是直接运行延续。WaitWhilePausedAsyncSomeMethodAsyncTask.RunIsPausedWaitWhilePausedAsync
有趣的是,如果没有另一个 await,该SomeMethodAsync方法将变得完全同步:它总是从 中检索已经完成的任务WaitWhilePausedAsync,因此它只是永远继续执行其无限循环。提醒一下,await其行为与已完成的任务同步(正如我在博客中所述)。这个无限循环在setter内运行IsPaused,因此Task.Run代码永远不会继续循环以ReadLine再次调用。
如果SomeMethodAsync 确实有一个await,那么它可以异步运行,返回到其调用者(完成延续),并允许循环Task.Run继续执行。
正如 Theodor Zoulias 所建议的,将TaskCreationOptions.RunContinuationsAsynchronously标志传递给遗嘱TaskCompletionSource<bool>也是可行的。在这种情况下,任务延续是异步运行的(在单独的线程池线程上),而不是直接在调用线程上执行。
IIRC,您引用的博客文章早于该RunContinuationsAsynchronously标志。它也早于(非通用)TaskCompletionSource.
另外,正如 Theodor Zoulias 所建议的,我的Nito.AsyncExPauseTokenSource库中有一个可以避免此问题的库。它还使用.RunContinuationsAsynchronously
| 归档时间: |
|
| 查看次数: |
118 次 |
| 最近记录: |