我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量.
假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.
如果可能的话,我也想利用给定数量的线程.
我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能.那里有更简单的解决方案吗?我认为这是一种常见的情况.
用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Run Code Online (Sandbox Code Playgroud)
这是代码:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() => …Run Code Online (Sandbox Code Playgroud) 我正在编写一个Windows服务,它将启动多个工作线程,这些线程将监听Amazon SQS队列并处理消息.将有大约20个线程监听10个队列.
线程必须始终运行,这就是为什么我倾向于实际使用实际线程作为工作循环而不是线程池线程.
这是一个顶级实现.Windows服务将启动多个工作线程,每个线程都会监听它的队列和处理消息.
protected override void OnStart(string[] args)
{
for (int i = 0; i < _workers; i++)
{
new Thread(RunWorker).Start();
}
}
Run Code Online (Sandbox Code Playgroud)
这是工作的实施
public async void RunWorker()
{
while(true)
{
// .. get message from amazon sqs sync.. about 20ms
var message = sqsClient.ReceiveMessage();
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
Run Code Online (Sandbox Code Playgroud)
我知道我可以使用Task.Run执行相同的操作并在线程池线程上执行它而不是启动单个线程,但我没有看到原因,因为每个线程将始终运行.
你看到这个实现有什么问题吗?让线程始终以这种方式运行是多么可靠,我该怎么做才能确保每个线程始终在运行?