基于任务的空闲检测

Ben*_*igt 5 c# timer async-await

想要限制某些事件之间的间隔并且如果超过限制则采取措施并不罕见.例如,网络对等体之间的心跳消息用于检测另一端是否存活.

在C#async/await样式中,可以通过每次心跳到达时替换超时任务来实现:

var client = new TcpClient { ... };
await client.ConnectAsync(...);

Task heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}
Run Code Online (Sandbox Code Playgroud)

这很方便,但是每个延迟实例都Task拥有一个Timer,并且如果许多心跳包在比阈值更短的时间内到达,那就是很多Timer加载线程池的对象.此外,每个完成都Timer将在线程池上运行代码,无论是否有任何延续仍然链接到它.

你不能通过Timer电话释放旧的heartbeatLost.Dispose(); 这将是一个例外

InvalidOperationException:任务只有在完成状态下才可以处理

可以创建一个CancellationTokenSource并使用它来取消旧的延迟任务,但是当定时器本身具有可重新调度的特征时,创建更多对象来实现这一点似乎是次优的.

什么是集成计时器重新安排的最佳方法,以便代码可以更像这样的结构?

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
Task heartbeatLost = idleTimeout.Task;
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}
Run Code Online (Sandbox Code Playgroud)

Sco*_*ain 4

对我来说似乎很简单,你假设的班级的名称可以让你大部分到达那里。您所需要的只是TaskCompletionSource一个不断重置的计时器。

public class TaskDelayedCompletionSource
{
    private TaskCompletionSource<bool> _completionSource;
    private readonly System.Threading.Timer _timer;
    private readonly object _lockObject = new object();

    public TaskDelayedCompletionSource(int interval)
    {
        _completionSource = CreateCompletionSource();
        _timer = new Timer(OnTimerCallback);
        _timer.Change(interval, Timeout.Infinite);
    }

    private static TaskCompletionSource<bool> CreateCompletionSource()
    {
        return new TaskCompletionSource<bool>(TaskCreationOptions.DenyChildAttach | TaskCreationOptions.RunContinuationsAsynchronously | TaskCreationOptions.HideScheduler);
    }

    private void OnTimerCallback(object state)
    {
        //Cache a copy of the completion source before we entier the lock, so we don't complete the wrong source if ResetDelay is in the middle of being called.
        var completionSource = _completionSource;
        lock (_lockObject)
        {
            completionSource.TrySetResult(true);
        }
    }

    public void ResetDelay(int interval)
    {
        lock (_lockObject)
        {
            var oldSource = _completionSource;
            _timer.Change(interval, Timeout.Infinite);
            _completionSource = CreateCompletionSource();
            oldSource.TrySetCanceled();
        }
    }
    public Task Task => _completionSource.Task;
}
Run Code Online (Sandbox Code Playgroud)

这只会创建一个计时器并更新它,当计时器触发时任务完成。

Task heartbeatLost = idleTimeout.Task;您将需要稍微更改代码,因为每次更新需要将调用放入while 循环内的结束时间时,都会创建一个新的 TaskCompletionSource 。

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task heartbeatLost = idleTimeout.Task;
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:如果您关注完成源的对象创建(例如,您在游戏引擎中编程,其中 GC 收集是一个很大的问题),您可能可以添加额外的逻辑并重用OnTimerCallback完成ResetDelay源,如果调用尚未发生,并且您确信自己不在重置延迟内。

您可能需要从使用 a 切换lock为 aSemaphoreSlim并将回调更改为

    private void OnTimerCallback(object state)
    {
        if(_semaphore.Wait(0))
        {
            _completionSource.TrySetResult(true);
        }
    }
Run Code Online (Sandbox Code Playgroud)

我可能会稍后更新这个答案以包括OnTimerCallback也会有的内容,但我现在没有时间。