当Async/Await发出http请求时(HttpClient例如使用),默认情况下是否内置了任何限制?
这样的问题这意味着连接无限数量的将被打开.我的应用程序在一个循环中执行一批http请求,似乎保持在大约50个TCP连接上限.
我最初担心我需要添加SemaphoreSlim限制,但.NET似乎为我做这个.任何人都可以对此有所了解吗?
我有一些以下形式的代码:
static async Task DoSomething(int n)
{
...
}
static void RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
}
Task.WhenAll(tasks).Wait(); // all threads must complete
}
Run Code Online (Sandbox Code Playgroud)
麻烦的是,如果我没有限制线程,事情就会开始崩溃.现在,我想启动最多的throttle线程,并且仅在旧线程完成时才启动新线程.我尝试过几种方法,迄今为止没有一种方法可行.我遇到的问题包括:
tasks集合必须与所有的任务被完全填充,无论是主动还是等待执行,否则最后的.Wait()通话只着眼于它开始与线程.Task.Run()等.但是我需要从一开始就引用每个任务,并且实例化任务似乎会自动启动它,这是我不想要的.这该怎么做?
我创建了一个类,它允许我同时运行多个操作以及设置最大并发限制的选项。即,如果我有 100 个操作要做,并且我设置maxCurrency为 10,那么在任何给定时间,最多应该同时运行 10 个操作。最终,所有的操作都应该被执行。
这是代码:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => …Run Code Online (Sandbox Code Playgroud) 我有这个:
Parallel.ForEach(numbers, (number) =>
{
var value = Regex.Replace(number, @"\s+", "%20");
tasks.Add(client.GetAsync(url + value));
});
await Task.WhenAll(tasks).ConfigureAwait(false);
foreach (var task in tasks)
{
...
}
Run Code Online (Sandbox Code Playgroud)
有时在到达 foreach(任务中的 var 任务)时返回较少的任务,但在几次请求后,开始返回所有任务。
我已将 ConfigureAwait 更改为 true,但有时仍会返回较少的任务。
顺便说一句,我使用 Parallel.ForEach,因为每个 client.GetAsync(url + value) 都是对外部 api 的请求,其特殊性在于其 99% 的请求的延迟 SLA 低于 1s
你们能解释一下为什么它有时会返回较少的任务吗?
有没有办法保证总是返回所有任务?
谢谢
我正在通过异步发出 http get 请求来验证图像 url。使用下面的代码一切正常,但是当我有这么多图像时,我们的防火墙将阻止我的互联网访问,因为同时请求的线程太多。因此,我正在寻找如何限制并发运行线程数的解决方案。我结束了这个线程告诉我使用 SemaphoreSlim但不知何故我无法理解这个想法以及如何实现它?
如果这不是最好的方法,请提出更好的方法。我不确定将 MaxDegreeOfParallelism 与 parallel.foreach 一起使用是否也有意义?
Dim tasks = myImages.Select(Function(x) testUrl_async(x))
Dim results = Await Task.WhenAll(tasks)
Async Function testUrl_async(ByVal myImage As image) As Task(Of image)
Dim myImageurl as string=myImage.imageurl
myHttpResponse = Await myHttpClient.GetAsync(myImageurl)
If myHttpResponse.IsSuccessStatusCode Then
Return myImage
Else
Return Nothing
End If
End Function
Run Code Online (Sandbox Code Playgroud) 我正在开发一个asp.net MVC-5 Web应用程序,基于我读过的一些文章,我不应该在Web服务器和.net web应用程序中使用并行方法.现在在我的情况下WebClient(),我需要在foreach中发出大约1,500个调用,然后从WebClient()调用中反序列化返回的json对象.使用前的原始代码Parallel.Foreach如下,大约需要15分钟才能完成: -
public async Task <List<Details2>> Get()
{
try
{
using (WebClient wc = new WebClient())
{
string url = currentURL + "resources?AUTHTOKEN=" + pmtoken;
var json = await wc.DownloadStringTaskAsync(url);
resourcesinfo = JsonConvert.DeserializeObject<ResourcesInfo>(json);
}
ForEach( var c in resourcesinfo.operation.Details)
{
ResourceAccountListInfo resourceAccountListInfo = new ResourceAccountListInfo();
using (WebClient wc = new WebClient())
{
string url = currentURL + "resources/" + c.RESOURCEID + "/accounts?AUTHTOKEN=" + pmtoken;
string tempurl = url.Trim();
var …Run Code Online (Sandbox Code Playgroud) 最近我看到了几个与 Parallel.ForEach 相关的 SO 线程与异步 lambdas 混合,但所有建议的答案都是某种解决方法。
有什么办法可以写:
List<int> list = new List<int>[]();
Parallel.ForEach(arrayValues, async (item) =>
{
var x = await LongRunningIoOperationAsync(item);
list.Add(x);
});
Run Code Online (Sandbox Code Playgroud)
如何确保列表将包含在每次迭代中使用 lambda 执行的所有迭代中的所有项目?
Parallel.ForEach 通常如何与异步 lambdas 一起工作,如果它命中等待,它会将其线程移交给下一次迭代吗?
我认为 ParallelLoopResult IsCompleted 字段不正确,因为它会在执行所有迭代时返回 true,无论它们的实际 lambda 作业是否完成?
我正在审查一些代码并试图提出一个技术原因,为什么您应该或不应该使用Task.WhenAll(Tasks[])并行进行 Http 调用。Http 调用调用不同的微服务,我猜其中一个调用可能需要也可能不需要一些时间来执行......(我想我对此并不感兴趣)。我正在使用 BenchmarkDotNet 来让我了解是否消耗了更多内存,或者执行时间是否有很大不同。以下是基准的过度简化示例:
[Benchmark]
public async Task<string> Task_WhenAll_Benchmark()
{
var t1 = Task1();
var t2 = Task2();
await Task.WhenAll(t1, t2);
return $"{t1.Result}===={t2.Result}";
}
[Benchmark]
public async Task<string> Task_KeepItSimple_Benchmark()
{
return $"{await Task1()}===={await Task2()}";
}
Run Code Online (Sandbox Code Playgroud)
Task1和Task2方法真的很简单(我HttpClient在类中有一个静态)
public async Task<string> Task1()
{
using (var request = await httpClient.GetAsync("http://localhost:8000/1.txt"))
{
return $"task{await request.Content.ReadAsStringAsync()}";
}
}
public async Task<string> Task2()
{
using (var request = await httpClient.GetAsync("http://localhost:8000/2.txt"))
{
return $"task{await request.Content.ReadAsStringAsync()}";
}
}
Run Code Online (Sandbox Code Playgroud)
还有我的结果 …
我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......
斯蒂芬的扩展:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Run Code Online (Sandbox Code Playgroud)
我的方法(不起作用;任务被执行但结果是错误的)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only …Run Code Online (Sandbox Code Playgroud) c# concurrency asynchronous task-parallel-library parallel.foreachasync
我有一个在其中运行多个异步方法的方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用Parallel.ForEach它可以同时针对多个设备运行这个过程。
假设这是我的方法。
public async Task ProcessDevice(Device device) {
var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);
var result = await DoSomething(dev);
await DoSomething2(dev);
}
Run Code Online (Sandbox Code Playgroud)
然后 DoSomething2 也调用了一个异步方法。
public async Task DoSomething2(Device dev) {
foreach(var obj in dev.Objects) {
await DoSomething3(obj);
}
}
Run Code Online (Sandbox Code Playgroud)
随着时间的推移,设备列表不断变大,因此该列表增长得越多,程序完成ProcessDevice()对每个设备的运行所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用Parallel.ForEach.
Parallel.ForEach(devices, async device => {
try {
await ProcessDevice(device);
} catch (Exception ex) {
throw ex;
}
})
Run Code Online (Sandbox Code Playgroud)
程序似乎在设备完全处理之前完成。我还尝试创建一个任务列表,然后为每个设备添加一个运行 ProcessDevice 的新任务到该列表,然后等待 Task.WhenAll(listOfTasks);
var listOfTasks = new List<Task>();
foreach(var device in devices) {
var task …Run Code Online (Sandbox Code Playgroud) c# parallel-processing task-parallel-library async-await parallel.foreach
我遇到了 TPL ActionBlock,它对于(限制的)并行异步操作似乎非常方便。到目前为止,我正在使用Task.WhenAll()(+Semaphore进行节流)。当谈到例外时,似乎存在很大差异:
var successList = new List<int>();
var failedList = new List<int>();
try
{
var actionBlock = new ActionBlock<int>(
async x => await Task.Run(() =>
{
if (x < 5)
{
failedList.Add(x);
throw new Exception(x.ToString());
}
successList.Add(x);
}),
new ExecutionDataflowBlockOptions());
Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
actionBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
// works for approach using task.whenall
Console.WriteLine(ex);
Assert.True(failedList.Count == 4);
Assert.True(successList.Count == 6);
return;
}
Assert.Fail();
Run Code Online (Sandbox Code Playgroud)
该测试失败,因为ActionBlock发生异常时立即停止。我发现这是 github 上的一个问题:Dataflow: …
我有一个 WPF 应用程序,它使用自定义实现的 ForEachAsyncParallel 方法:
public static Task ForEachParallelAsync<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism)
{
return Task.WhenAll(Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism).Select(partition => Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
{
await body(partition.Current);
}
}
})));
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是,当正文包含异步语句并在执行它时,它将线程切换到新线程,这会导致The calling thread cannot access this object because a different thread owns it.
另一种方法是使用 foreach 循环或调度程序(我不想这样做)是否可以指出当前实现的问题?
c# ×11
async-await ×8
asynchronous ×3
.net ×2
task ×2
asp.net ×1
asp.net-mvc ×1
concurrency ×1
exception ×1
semaphore ×1
tpl-dataflow ×1
vb.net ×1