使用Rx和SelectMany限制并发请求

Sup*_*JMN 3 .net c# concurrency reactive-programming system.reactive

我有一个我想要同时下载的页面的URL列表HttpClient.URL列表可能很大(100或更多!)

我目前有这个代码:

var urls = new List<string>
            {
                @"http:\\www.amazon.com",
                @"http:\\www.bing.com",
                @"http:\\www.facebook.com",
                @"http:\\www.twitter.com",
                @"http:\\www.google.com"
            };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));

contents.Subscribe(Console.WriteLine);
Run Code Online (Sandbox Code Playgroud)

问题是:由于使用的原因SelectMany,几乎同时创建了大量的任务.似乎如果URL列表足够大,很多任务会给出超时(我得到"任务被取消"例外).

所以,我认为应该有一种方法,可能使用某种调度程序,来限制并发任务的数量,在给定时间不允许超过5或6.

通过这种方式,我可以获得并发下载而无需启动太多可能会失速的任务,就像他们现在所做的那样.

如何做到这一点,我不会因为大量的超时任务而饱和?

十分感谢.

Dor*_*rus 13

记住SelectMany()其实是Select().Merge().虽然SelectMany没有maxConcurrent参数,但是Merge().所以你可以使用它.

从您的示例中,您可以执行以下操作:

var urls = new List<string>
    {
        @"http:\\www.amazon.com",
        @"http:\\www.bing.com",
        @"http:\\www.facebook.com",
        @"http:\\www.twitter.com",
        @"http:\\www.google.com"
    };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
    .Merge(2); // 2 maximum concurrent requests!

contents.Subscribe(Console.WriteLine);
Run Code Online (Sandbox Code Playgroud)