对HttpClient请求进行速率限制的简单方法

Jac*_*cob 9 c# async-await

我在System.Net.Http中使用HTTPClient来对API发出请求.API限制为每秒10个请求.

我的代码大致是这样的:

    List<Task> tasks = new List<Task>();
    items..Select(i => tasks.Add(ProcessItem(i));

    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception ex)
    {
    }
Run Code Online (Sandbox Code Playgroud)

ProcessItem方法做了一些事情,但总是使用以下方法调用API : await SendRequestAsync(..blah). 看起来像:

private async Task<Response> SendRequestAsync(HttpRequestMessage request, CancellationToken token)
{    
    token.ThrowIfCancellationRequested();
    var response = await HttpClient
        .SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false);

    token.ThrowIfCancellationRequested();
    return await Response.BuildResponse(response);
}
Run Code Online (Sandbox Code Playgroud)

最初代码工作正常,但是当我开始使用Task.WhenAll时,我开始从API获得"超出速率限制"消息.如何限制请求的速率?

值得注意的是,ProcessItem可以根据项目进行1-4次API调用.

Ste*_*ary 6

API限制为每秒10个请求.

然后让您的代码执行一批10个请求,确保它们至少花费一秒钟:

Items[] items = ...;

int index = 0;
while (index < items.Length)
{
  var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); // ".2" to make sure
  var tasks = items.Skip(index).Take(10).Select(i => ProcessItemsAsync(i));
  var tasksAndTimer = tasks.Concat(new[] { timer });
  await Task.WhenAll(tasksAndTimer);
  index += 10;
}
Run Code Online (Sandbox Code Playgroud)

更新

我的ProcessItems方法根据项目进行1-4次API调用.

在这种情况下,批处理不是一个合适的解决方案.您需要将异步方法限制为某个数字,这意味着a SemaphoreSlim.棘手的部分是你希望随着时间的推移允许更多的电话.

我没试过这个代码,但总的想法,我会跟去是有释放的信号周期函数高达 10倍.所以,像这样:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10);

private async Task<Response> ThrottledSendRequestAsync(HttpRequestMessage request, CancellationToken token)
{
  await _semaphore.WaitAsync(token);
  return await SendRequestAsync(request, token);
}

private async Task PeriodicallyReleaseAsync(Task stop)
{
  while (true)
  {
    var timer = Task.Delay(TimeSpan.FromSeconds(1.2));

    if (await Task.WhenAny(timer, stop) == stop)
      return;

    // Release the semaphore at most 10 times.
    for (int i = 0; i != 10; ++i)
    {
      try
      {
        _semaphore.Release();
      }
      catch (SemaphoreFullException)
      {
        break;
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

用法:

// Start the periodic task, with a signal that we can use to stop it.
var stop = new TaskCompletionSource<object>();
var periodicTask = PeriodicallyReleaseAsync(stop.Task);

// Wait for all item processing.
await Task.WhenAll(taskList);

// Stop the periodic task.
stop.SetResult(null);
await periodicTask;
Run Code Online (Sandbox Code Playgroud)


Gab*_*uci 5

答案与类似。

WhenAll使用Parallel.ForEach并使用ParallelOptions将并发任务的数量限制为 10,并确保每个任务至少需要 1 秒,而不是使用任务列表 and :

Parallel.ForEach(
    items,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
      ProcessItems(item);
      await Task.Delay(1000);
    }
);
Run Code Online (Sandbox Code Playgroud)

或者,如果您想确保每个项目都花费尽可能接近 1 秒的时间:

Parallel.ForEach(
    searches,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
        var watch = new Stopwatch();
        watch.Start();
        ProcessItems(item);
        watch.Stop();
        if (watch.ElapsedMilliseconds < 1000) await Task.Delay((int)(1000 - watch.ElapsedMilliseconds));
    }
);
Run Code Online (Sandbox Code Playgroud)

或者:

Parallel.ForEach(
    searches,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
        await Task.WhenAll(
                Task.Delay(1000),
                Task.Run(() => { ProcessItems(item); })
            );
    }
);
Run Code Online (Sandbox Code Playgroud)


Eri*_* J. 1

更新的答案

我的 ProcessItems 方法根据项目进行 1-4 次 API 调用。因此,批量大小为 10 时,我仍然超出了速率限制。

您需要在 SendRequestAsync 中实现滚动窗口。包含每个请求的时间戳的队列是合适的数据结构。您将时间戳早于 10 秒的条目出队。碰巧的是,有一个实现作为对 SO 上类似问题的答案。

原答案

可能对其他人仍然有用

处理此问题的一种简单方法是以 10 个为一组对请求进行批处理,同时运行这些请求,然后等待总共 10 秒过去(如果还没有)。如果这批请求可以在 10 秒内完成,这将使您达到速率限制,但如果这批请求需要更长时间,则效果不佳。查看MoreLinq中的 .Batch() 扩展方法。代码看起来大约像

foreach (var taskList in tasks.Batch(10))
{
    Stopwatch sw = Stopwatch.StartNew(); // From System.Diagnostics
    await Task.WhenAll(taskList.ToArray());
    if (sw.Elapsed.TotalSeconds < 10.0) 
    {
        // Calculate how long you still have to wait and sleep that long
        // You might want to wait 10.5 or 11 seconds just in case the rate
        // limiting on the other side isn't perfectly implemented
    }
}
Run Code Online (Sandbox Code Playgroud)