Jef*_*ser 14 .net c# task-parallel-library async-await azureservicebus
我想运行一个具有" 心跳 " 的任务,该任务以特定的时间间隔继续运行,直到任务完成.
我在想这样的扩展方法效果很好:
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken)
Run Code Online (Sandbox Code Playgroud)
例如:
public class Program {
public static void Main() {
var cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken);
withHeartbeatTask.Wait();
Console.WriteLine("Long running task completed!");
Console.ReadLine()
}
private static void SomeLongRunningTask() {
Console.WriteLine("Starting long task");
Thread.Sleep(TimeSpan.FromSeconds(9.5));
}
private static int _heartbeatCount = 0;
private static void PerformHeartbeat(CancellationToken cancellationToken) {
Console.WriteLine("Heartbeat {0}", ++_heartbeatCount);
}
}
Run Code Online (Sandbox Code Playgroud)
该程序应输出:
Starting long task
Heartbeat 1
Heartbeat 2
Heartbeat 3
Heartbeat 4
Heartbeat 5
Heartbeat 6
Heartbeat 7
Heartbeat 8
Heartbeat 9
Long running task completed!
Run Code Online (Sandbox Code Playgroud)
请注意,它不应(在正常情况下)输出"Heartbeat 10",因为心跳在初始超时(即1秒)后开始.同样,如果任务花费的时间少于心跳间隔,则根本不应发生心跳.
实现这个的好方法是什么?
背景信息:我有一个服务正在侦听Azure Service Bus队列.我想不完成消息(将永久删除它从队列中),直到我完成处理它,这可能需要比最长消息LockDuration 5分钟更长的时间.因此,我需要使用此心跳方法在锁定持续时间到期之前调用RenewLockAsync,以便在进行冗长处理时消息不会超时.
Jef*_*ser 13
这是我的尝试:
public static class TaskExtensions {
/// <summary>
/// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
/// </summary>
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) {
return;
}
var stopHeartbeatSource = new CancellationTokenSource();
cancellationToken.Register(stopHeartbeatSource.Cancel);
await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token));
stopHeartbeatSource.Cancel();
}
private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
await Task.Delay(interval, cancellationToken);
if (!cancellationToken.IsCancellationRequested) {
heartbeatAction(cancellationToken);
}
}
catch (TaskCanceledException tce) {
if (tce.CancellationToken == cancellationToken) {
// Totally expected
break;
}
throw;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
或者稍微调整一下,你甚至可以像以下一样使心跳异步:
/// <summary>
/// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
/// </summary>
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) {
return;
}
var stopHeartbeatSource = new CancellationTokenSource();
cancellationToken.Register(stopHeartbeatSource.Cancel);
await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token));
if (!stopHeartbeatSource.IsCancellationRequested) {
stopHeartbeatSource.Cancel();
}
}
public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) {
return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None);
}
private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
await Task.Delay(interval, cancellationToken);
if (!cancellationToken.IsCancellationRequested) {
await heartbeatTaskFactory(cancellationToken);
}
}
catch (TaskCanceledException tce) {
if (tce.CancellationToken == cancellationToken) {
// Totally expected
break;
}
throw;
}
}
}
Run Code Online (Sandbox Code Playgroud)
这将允许您将示例代码更改为以下内容:
private static async Task PerformHeartbeat(CancellationToken cancellationToken) {
Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount);
await Task.Delay(1000, cancellationToken);
Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount);
}
Run Code Online (Sandbox Code Playgroud)
PerformHeartbeat可以替换为RenewLockAsync之类的异步调用,这样您就不必使用Action方法需要的阻塞调用(如RenewLock)来浪费线程时间.
我根据SO指南回答了我自己的问题,但我也对这个问题采取更优雅的方法.
| 归档时间: |
|
| 查看次数: |
9794 次 |
| 最近记录: |