分区:如何在每个分区后添加等待

Nem*_*emo 1 c# parallel-processing rate-limiting task-parallel-library

我有一个每分钟接受 20 个请求的 API,之后我需要等待 1 分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以用来将Partitioner我的列表划分为 20 个项目/请求,但很快我意识到这Partitioner不起作用,我的第二个想法在分区中添加 adelay但这也是一个坏主意,根据我的理解,它会在每个不需要的请求之后添加一个延迟,相反,我需要在每个Partition. 下面是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}
Run Code Online (Sandbox Code Playgroud)

有谁知道我怎样才能做到这一点?

The*_*ias 8

您可以使用以下类RateLimiter来限制异步操作​​的频率。这是这个RateLimiter答案中找到的类的更简单的实现。

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        // Schedule the release of the semaphore using a Timer.
        // Use the newly created Timer object as the state object, to prevent GC.
        // Handle the unlikely case that the _timeUnit is invalid.
        System.Threading.Timer timer = new(_ => _semaphore.Release());
        try { timer.Change(_timeUnit, Timeout.InfiniteTimeSpan); }
        catch { _semaphore.Release(); throw; }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用示例:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));
Run Code Online (Sandbox Code Playgroud)

在线演示

Timer这个特定的构造函数构造的,以防止它在触发之前被垃圾收集,正如Nick H 的回答中所解释的那样。

注意:此实现存在轻微漏洞,因为它创建了内部一次性System.Threading.Timer对象,而当您使用完RateLimiter. 任何活动的计时器都会阻止RateLimiter垃圾收集,直到这些计时器触发其回调。也SemaphoreSlim没有按应有的方式进行处理。这些都是小缺陷,不太可能影响仅创建少数RateLimiters 的程序。如果您打算创建很多它们,您可以查看此答案的第三版RateLimiter,其中包含基于该Task.Delay方法的一次性版本。


这是该类的另一种实现RateLimiter,更复杂,它基于属性Environment.TickCount64而不是SemaphoreSlim. 它的优点是不会在后台创建“即发即忘”计时器。缺点是该WaitAsync方法不支持CancellationToken参数,并且由于复杂性,出现错误的可能性较高。

public class RateLimiter
{
    private readonly Queue<long> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly int _timeUnitMilliseconds;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _queue = new Queue<long>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
    }

    public Task WaitAsync()
    {
        int delayMilliseconds = 0;
        lock (_queue)
        {
            long currentTimestamp = Environment.TickCount64;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                long refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
                Debug.Assert(delayMilliseconds >= 0);
                if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delayMilliseconds
                + _timeUnitMilliseconds);
        }
        if (delayMilliseconds == 0) return Task.CompletedTask;
        return Task.Delay(delayMilliseconds);
    }
}
Run Code Online (Sandbox Code Playgroud)