具有支持多线程的限制的异步任务的队列

STO*_*STO 9 .net c# multithreading async-await

我需要实现一个库来请求vk.com API.问题是API每秒只支持3个请求.我想让API异步.

重要: API应支持从多个线程安全访问.

我的想法是实现一个名为throttler的类,它允许不超过3个请求/秒并延迟其他请求.

接口是下一个:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}
Run Code Online (Sandbox Code Playgroud)

用法就像

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());
Run Code Online (Sandbox Code Playgroud)

如何实施throttler?

目前我将其实现为由后台线程处理的队列.

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}
Run Code Online (Sandbox Code Playgroud)

这是缩短处理队列的后台线程循环的代码:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

是否可以在没有后台线程的情况下实现它?

Ser*_*rvy 10

因此,我们将首先解决一个更简单的问题,即创建一个可同时处理多达N个任务的队列,而不是限制每秒启动的N个任务,并在此基础上构建:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我们还将使用以下帮助器方法将a的结果与TaskCompletionSource`Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}
Run Code Online (Sandbox Code Playgroud)

现在,对于我们的实际解决方案,我们可以做的是每次我们需要执行限制操作时,我们创建一个TaskCompletionSource,然后进入我们TaskQueue并添加一个启动任务的项目,将TCS与其结果匹配,不等待它,然后将任务队列延迟1秒钟.然后,任务队列将不允许任务启动,直到在过去的第二秒中不再启动N个任务,而操作的结果本身与create相同Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @STO不,重要的是不要*使用它.这就是重点.队列仅关注*启动*给定操作后的1秒延迟.它根本不关心操作本身*何时完成*.那是你的前提.因此,只要在过去的第二个任务中启动了N个任务,它就可以确保您无法启动任务,并且让自下一个任务从最早的任务**启动后的1秒开始. (2认同)