相关疑难解决方法(0)

异步/等待最大并发http请求数

当Async/Await发出http请求时(HttpClient例如使用),默认情况下是否内置了任何限制?

这样的问题意味着连接无限数量的将被打开.我的应用程序在一个循环中执行一批h​​ttp请求,似乎保持在大约50个TCP连接上限.

我最初担心我需要添加SemaphoreSlim限制,但.NET似乎为我做这个.任何人都可以对此有所了解吗?

c# async-await

4
推荐指数
1
解决办法
1238
查看次数

如何限制多个异步任务?

我有一些以下形式的代码:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}
Run Code Online (Sandbox Code Playgroud)

麻烦的是,如果我没有限制线程,事情就会开始崩溃.现在,我想启动最多的throttle线程,并且仅在旧线程完成时才启动新线程.我尝试过几种方法,迄今为止没有一种方法可行.我遇到的问题包括:

  • tasks集合必须与所有的任务被完全填充,无论是主动还是等待执行,否则最后的.Wait()通话只着眼于它开始与线程.
  • 链接执行似乎需要使用Task.Run()等.但是我需要从一开始就引用每个任务,并且实例化任务似乎会自动启动它,这是我不想要的.

这该怎么做?

c# asynchronous task-parallel-library async-await

3
推荐指数
6
解决办法
4279
查看次数

以最大并发运行多个操作 - 不执行最后 2 个任务

我创建了一个类,它允许我同时运行多个操作以及设置最大并发限制的选项。即,如果我有 100 个操作要做,并且我设置maxCurrency为 10,那么在任何给定时间,最多应该同时运行 10 个操作。最终,所有的操作都应该被执行。

这是代码:

public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
    using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
    var results = new ConcurrentBag<T>();
    var tasks = new List<Task>();
    foreach (var operation in operations)
    {
        await semaphore.WaitAsync(ct).ConfigureAwait(false);

        var task = Task.Factory.StartNew(async () =>
        {
            try
            {
                Debug.WriteLine($"Adding new result");
                var singleResult = await operation(ct).ConfigureAwait(false);
                results.Add(singleResult);
                Debug.WriteLine($"Added {singleResult}");
            }
            finally
            {
                semaphore.Release();
            }
        }, ct);
        tasks.Add(task);
    }
    await Task.WhenAll(tasks).ConfigureAwait(false);

    Debug.WriteLine($"Completed tasks: {tasks.Count(t => …
Run Code Online (Sandbox Code Playgroud)

.net c# asynchronous task async-await

3
推荐指数
1
解决办法
78
查看次数

C# Parallel.ForEach 和 Task.WhenAll 有时返回的值比假设的要少

我有这个:

Parallel.ForEach(numbers, (number) =>
{
    var value = Regex.Replace(number, @"\s+", "%20");

    tasks.Add(client.GetAsync(url + value));
});

await Task.WhenAll(tasks).ConfigureAwait(false);

foreach (var task in tasks)
{
  ...
}
Run Code Online (Sandbox Code Playgroud)

有时在到达 foreach(任务中的 var 任务)时返回较少的任务,但在几次请求后,开始返回所有任务。

我已将 ConfigureAwait 更改为 true,但有时仍会返回较少的任务。

顺便说一句,我使用 Parallel.ForEach,因为每个 client.GetAsync(url + value) 都是对外部 api 的请求,其特殊性在于其 99% 的请求的延迟 SLA 低于 1s

你们能解释一下为什么它有时会返回较少的任务吗?

有没有办法保证总是返回所有任务?

谢谢

c# task async-await parallel-foreach configureawait

3
推荐指数
2
解决办法
148
查看次数

使用“Task.WhenAll”时如何控制线程数

我正在通过异步发出 http get 请求来验证图像 url。使用下面的代码一切正常,但是当我有这么多图像时,我们的防火墙将阻止我的互联网访问,因为同时请求的线程太多。因此,我正在寻找如何限制并发运行线程数的解决方案。我结束了这个线程告诉我使用 SemaphoreSlim但不知何故我无法理解这个想法以及如何实现它?

  • 在添加任务时,SemaphoreSlim wait 或 waitAsnyc(有什么区别?)应该在 foreach 内?我可以像在代码中一样使用 linq 创建任务列表吗?
    • 为什么要使用task.Run?
    • 线程在哪一行执行之后开始?在 task.run 或 task.whenall 之后?

如果这不是最好的方法,请提出更好的方法。我不确定将 MaxDegreeOfParallelism 与 parallel.foreach 一起使用是否也有意义?

  Dim tasks = myImages.Select(Function(x) testUrl_async(x))
  Dim results = Await Task.WhenAll(tasks)

Async Function testUrl_async(ByVal myImage  As image) As Task(Of image)
   Dim myImageurl as string=myImage.imageurl
   myHttpResponse = Await myHttpClient.GetAsync(myImageurl)
    If myHttpResponse.IsSuccessStatusCode Then
        Return myImage
    Else
        Return Nothing
    End If
End Function
Run Code Online (Sandbox Code Playgroud)

vb.net multithreading semaphore async-await

2
推荐指数
1
解决办法
1255
查看次数

在我的asp.net mvc Web应用程序中使用带有WebClient()的Parallel.Foreach有任何缺点或风险吗?

我正在开发一个asp.net MVC-5 Web应用程序,基于我读过的一些文章,我不应该在Web服务器和.net web应用程序中使用并行方法.现在在我的情况下WebClient(),我需要在foreach中发出大约1,500个调用,然后从WebClient()调用中反序列化返回的json对象.使用前的原始代码Parallel.Foreach如下,大约需要15分钟才能完成: -

    public async Task <List<Details2>> Get()
            {       

              try
                {

                    using (WebClient wc = new WebClient()) 
                    {
                        string url = currentURL + "resources?AUTHTOKEN=" + pmtoken;
                        var json = await wc.DownloadStringTaskAsync(url);
                        resourcesinfo = JsonConvert.DeserializeObject<ResourcesInfo>(json);

                    }


                    ForEach( var c in resourcesinfo.operation.Details)
                   {

                        ResourceAccountListInfo resourceAccountListInfo = new ResourceAccountListInfo();
                        using (WebClient wc = new WebClient()) 
                        {

                            string url = currentURL + "resources/" + c.RESOURCEID + "/accounts?AUTHTOKEN=" + pmtoken;
                            string tempurl = url.Trim();



                            var …
Run Code Online (Sandbox Code Playgroud)

.net c# asp.net parallel-processing asp.net-mvc

2
推荐指数
1
解决办法
1567
查看次数

Parallel.ForEach 与异步 lambda 等待所有迭代完成

最近我看到了几个与 Parallel.ForEach 相关的 SO 线程与异步 lambdas 混合,但所有建议的答案都是某种解决方法。

有什么办法可以写:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});
Run Code Online (Sandbox Code Playgroud)

如何确保列表将包含在每次迭代中使用 lambda 执行的所有迭代中的所有项目?

Parallel.ForEach 通常如何与异步 lambdas 一起工作,如果它命中等待,它会将其线程移交给下一次迭代吗?

我认为 ParallelLoopResult IsCompleted 字段不正确,因为它会在执行所有迭代时返回 true,无论它们的实际 lambda 作业是否完成?

c# task-parallel-library parallel.foreach

2
推荐指数
2
解决办法
201
查看次数

使用 Task.WhenAll,或不使用 Task.WhenAll

我正在审查一些代码并试图提出一个技术原因,为什么您应该或不应该使用Task.WhenAll(Tasks[])并行进行 Http 调用。Http 调用调用不同的微服务,我猜其中一个调用可能需要也可能不需要一些时间来执行......(我想我对此并不感兴趣)。我正在使用 BenchmarkDotNet 来让我了解是否消耗了更多内存,或者执行时间是否有很大不同。以下是基准的过度简化示例:

[Benchmark]
public async Task<string> Task_WhenAll_Benchmark()
{
  var t1 = Task1();
  var t2 = Task2();

  await Task.WhenAll(t1, t2);

  return $"{t1.Result}===={t2.Result}";
}

[Benchmark]
public async Task<string> Task_KeepItSimple_Benchmark()
{
  return $"{await Task1()}===={await Task2()}";
}
Run Code Online (Sandbox Code Playgroud)

Task1Task2方法真的很简单(我HttpClient在类中有一个静态)

public async Task<string> Task1()
{
  using (var request = await httpClient.GetAsync("http://localhost:8000/1.txt"))
  {
    return $"task{await request.Content.ReadAsStringAsync()}";
  }
}

public async Task<string> Task2()
{
  using (var request = await httpClient.GetAsync("http://localhost:8000/2.txt"))
  {
    return $"task{await request.Content.ReadAsStringAsync()}";
  }
}
Run Code Online (Sandbox Code Playgroud)

还有我的结果 …

c# async-await

2
推荐指数
1
解决办法
138
查看次数

ForEachAsync 与结果

我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......

斯蒂芬的扩展:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}
Run Code Online (Sandbox Code Playgroud)

我的方法(不起作用;任务被执行但结果是错误的)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only …
Run Code Online (Sandbox Code Playgroud)

c# concurrency asynchronous task-parallel-library parallel.foreachasync

1
推荐指数
1
解决办法
2390
查看次数

Parallel.ForEach 中的嵌套异步方法

我有一个在其中运行多个异步方法的方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用Parallel.ForEach它可以同时针对多个设备运行这个过程。

假设这是我的方法。

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}
Run Code Online (Sandbox Code Playgroud)

然后 DoSomething2 也调用了一个异步方法。

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}
Run Code Online (Sandbox Code Playgroud)

随着时间的推移,设备列表不断变大,因此该列表增长得越多,程序完成ProcessDevice()对每个设备的运行所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})
Run Code Online (Sandbox Code Playgroud)

程序似乎在设备完全处理之前完成。我还尝试创建一个任务列表,然后为每个设备添加一个运行 ProcessDevice 的新任务到该列表,然后等待 Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing task-parallel-library async-await parallel.foreach

1
推荐指数
1
解决办法
179
查看次数

如何将 ActionBlock 的所有异常包装在单个 AggregateException 中

我遇到了 TPL ActionBlock,它对于(限制的)并行异步操作似乎非常方便。到目前为止,我正在使用Task.WhenAll()(+Semaphore进行节流)。当谈到例外时,似乎存在很大差异:

var successList = new List<int>();
var failedList = new List<int>();
try
{
    var actionBlock = new ActionBlock<int>(
        async x => await Task.Run(() =>
        {
            if (x < 5)
            {
                failedList.Add(x);
                throw new Exception(x.ToString());
            }

            successList.Add(x);
        }),
        new ExecutionDataflowBlockOptions());

    Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
    actionBlock.Complete();

    await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
    // works for approach using task.whenall
    Console.WriteLine(ex);
    Assert.True(failedList.Count == 4);
    Assert.True(successList.Count == 6);
    return;
}

Assert.Fail();
Run Code Online (Sandbox Code Playgroud)

该测试失败,因为ActionBlock发生异常时立即停止。我发现这是 github 上的一个问题:Dataflow: …

c# exception task-parallel-library tpl-dataflow

1
推荐指数
1
解决办法
452
查看次数

自定义实现的 ForEachParallelAsync 在调用异步方法时切换线程

我有一个 WPF 应用程序,它使用自定义实现的 ForEachAsyncParallel 方法:

public static Task ForEachParallelAsync<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism)
{
      return Task.WhenAll(Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism).Select(partition => Task.Run(async delegate
      {
           using (partition)
           {
               while (partition.MoveNext())
               {
                   await body(partition.Current);
               }
           }
      })));
}
Run Code Online (Sandbox Code Playgroud)

我面临的问题是,当正文包含异步语句并在执行它时,它将线程切换到新线程,这会导致The calling thread cannot access this object because a different thread owns it.

另一种方法是使用 foreach 循环或调度程序(我不想这样做)是否可以指出当前实现的问题?

c# async-await

1
推荐指数
1
解决办法
82
查看次数