标签: parallel.foreachasync

使用 Parallel.ForEachAsync

我正在尝试运行 a Parallel.ForEachAsync(),但出现以下两个错误:

错误 1:
参数 2:无法从 System.Threading.Tasks.ParallelOptions 转换为 System.Threading.CancellationToken
错误 2:
Delegate Func<WC, CancellationToken, ValueTask> 不采用 1 个参数

这是我的代码:

public async Task<List<DisplayDto>> GetData()
{
    var options = new ParallelOptions()
    {
        MaxDegreeOfParallelism = 20;
    };

    await Parallel.ForEachAsync(result, options, async OrderNumber => {
        //Do Stuff here. 
    });
}
Run Code Online (Sandbox Code Playgroud)

为了使其按照我的要求工作,必须改变什么?

c# parallel-processing asynchronous parallel.foreachasync

19
推荐指数
1
解决办法
3万
查看次数

停止 Parallel.ForEachAsync

在 C# 中,我对停止循环感兴趣Parallel.ForEachAsync(考虑之间的差异StopBreak);因为Parallel.ForEach我可以执行以下操作:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});
Run Code Online (Sandbox Code Playgroud)

但是,由于我有一个需要异步执行的进程,所以我切换到了Parallel.ForEachAsync. ForEachAsync没有该方法Stop(),我可以按break如下方式循环,但我想知道这是否是打破循环的最有效方法(换句话说,循环在收到取消时需要尽快停止要求)。

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});
Run Code Online (Sandbox Code Playgroud)

c# asynchronous cancellation parallel.foreach parallel.foreachasync

16
推荐指数
1
解决办法
4597
查看次数

Parallel.ForEachAsync 是否可以替代普通的 for 循环 + 附加到任务列表 (async wait Task.Run)+ WhenAll?

假设我想发出并行 API post 请求。

在 for 循环中,我可以将 http post 调用附加到任务列表中(使用 Task.Run 调用的每个任务),然后等待所有任务完成使用await Task.WhenAll. 因此,在等待网络请求完成时,控制权将交给调用者。实际上,API 请求将并行发出。

同样,我可以使用Parallel.ForEachAsync它将自动执行WhenAll并将控制权返回给调用者。所以我想问是否ForEachAsync可以替换普通的 for 循环列表(async wait Task.Run)和WhenAll

c# asynchronous async-await parallel.foreachasync

8
推荐指数
1
解决办法
5021
查看次数

如何打破 Parallel.ForEachAsync 循环,而不是取消它?

在 .NET 5 中,Parallel.ForEach您可以使用ParallelLoopState.Break()方法来停止处理的额外迭代。允许当前的完成处理。

但是新的 .NET 6Parallel.ForEachAsync没有这个ParallelLoopState类,所以我们不能像使用Parallel.ForEach. 那么有没有办法在 中执行相同的中断功能ForEachAsyncCancellationToken传递给 func 我不认为这是正确的方法,因为您没有尝试取消正在运行的循环,而是阻止额外的迭代开始。

类似于此功能,但对于异步版本:

int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
    (file, state) =>
    {
        Interlocked.Increment(ref count);
        if (count >= MaxFilesToProcess)
        {
            state.Break();
        }
...
Run Code Online (Sandbox Code Playgroud)

作为一种解决方法,我可能可以.Take([xx])在将其传递到并行循环之前使用它TSource,但这可能不是打破复杂条件的选项。

c# asynchronous .net-6.0 parallel.foreachasync

6
推荐指数
1
解决办法
3039
查看次数

.NET 6 Parallel.ForEachAsync 中需要两个取消令牌吗?

我正在尝试如何打破循环ForEachAsyncbreak不起作用,但我可以调用CancelCancellationTokenSource。的签名ForEachAsync有两个标记 - 一个作为独立参数,另一个在Func主体签名中。

我注意到,当cts.Cancel()调用时,tokent变量都IsCancellationRequested设置为 true。所以,我的问题是:这两个单独的论点的目的是什么token?有什么值得注意的区别吗?

List<string> symbols = new() { "A", "B", "C" };
var cts = new CancellationTokenSource();
var token = cts.Token;
token.ThrowIfCancellationRequested();

try
{
    await Parallel.ForEachAsync(symbols, token, async (symbol, t) =>
    {
        if (await someConditionAsync())
        {
            cts.Cancel();
        }
    });
catch (OperationCanceledException oce)
{
    Console.WriteLine($"Stopping parallel loop: {oce}");
}
finally
{
    cts.Dispose();
}
Run Code Online (Sandbox Code Playgroud)

c# cancellation cancellation-token parallel.foreachasync

4
推荐指数
1
解决办法
3446
查看次数

从 IEnumerable&lt;Task&lt;T&gt;&gt; 到 IAsyncEnumerable&lt;T&gt; 通过在 Parallel.ForEach/Parallel.ForEachAsync 内返回的yield 给出错误 CS1621

在 .NET 6 项目中,我必须调用一个偏移分页(页/每页)的 Web API,并且我希望尽可能使 n 个调用并行。

这是使用给定页码调用 API 一次的方法:

private Task<ApiResponse> CallApiAsync(int page,
    CancellationToken cancellationToken = default)
{
    return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
        .ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)

我实际上需要的是从第 1 页到第 n 页的所有 API 调用的仅前向流式迭代器,因此考虑到这一要求,我认为这IAsyncEnumerable是正确的 API,这样我就可以并行触发 API 调用并访问每个 API 响应一旦准备好,就可以完成,而不需要全部完成。

所以我想出了以下代码:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);

    var pages = Enumerable.Range(1, numCalls);

    Parallel.ForEach(pages, async page => {
        yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    });

    yield break;
}
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误 …

c# parallel.foreach .net-core iasyncenumerable parallel.foreachasync

3
推荐指数
1
解决办法
300
查看次数

C# .NET 中的 Parallel.ForAsync?

我想在 C# 中异步并行执行 for 循环。但该类Parallel仅包含一个同步For方法和一个异步ForEachAsync方法(.NET 6)。这对我来说似乎是一个疏忽。

方法在哪里Parallel.ForAsync?有什么解决方法吗?

.net c# asynchronous async-await parallel.foreachasync

3
推荐指数
1
解决办法
698
查看次数

.NET 6 并行操作中 MaxDegreeOfParallelism = -1 的含义是什么?

该财产的文件ParallelOptions.MaxDegreeOfParallelism指出:

该属性会影响传递此实例的方法调用MaxDegreeOfParallelism运行的并发操作数。正的属性值将并发操作的数量限制为设定值。如果为-1,则并发运行的操作数没有限制。ParallelParallelOptions

默认情况下,ForForEach利用底层调度程序提供的线程数量,因此更改MaxDegreeOfParallelism默认值只会限制将使用的并发任务数量。

我试图理解“无限制”在这种情况下意味着什么。根据以上文档摘录,我的期望是Parallel.Invoke配置的操作MaxDegreeOfParallelism = -1将立即开始并行执行所有提供的actions. 但事实并非如此。这是一个包含 12 个操作的实验:

int concurrency = 0;
Action action = new Action(() =>
{
    var current = Interlocked.Increment(ref concurrency);
    Console.WriteLine(@$"Started an action at {DateTime
        .Now:HH:mm:ss.fff} on thread #{Thread
        .CurrentThread.ManagedThreadId} with concurrency {current}");
    Thread.Sleep(1000);
    Interlocked.Decrement(ref concurrency);
});
Action[] actions = Enumerable.Repeat(action, 12).ToArray();
var options = new ParallelOptions() { MaxDegreeOfParallelism = -1 };
Parallel.Invoke(options, …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library parallel.foreach .net-6.0 parallel.foreachasync

2
推荐指数
1
解决办法
1550
查看次数

ForEachAsync 与结果

我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......

斯蒂芬的扩展:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}
Run Code Online (Sandbox Code Playgroud)

我的方法(不起作用;任务被执行但结果是错误的)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only …
Run Code Online (Sandbox Code Playgroud)

c# concurrency asynchronous task-parallel-library parallel.foreachasync

1
推荐指数
1
解决办法
2390
查看次数

Parallel.ForeachAsync 时是否需要互斥体?

据我了解,Parallel.ForeachAsync 调用中的委托会同时执行多次。

如果该委托操作的变量不是该委托的本地变量怎么办?

假设我在委托中增加了一个静态计数器。我需要通过互斥锁或其他东西来保护计数器吗?

c# concurrency mutex asynchronous parallel.foreachasync

1
推荐指数
1
解决办法
98
查看次数

如何在 Parallel.ForEachAsync 中使用 CancellationTokenSource

拥有这个处理程序:

public async Task<Result> Handle(MyQuery request, CancellationToken cancellationToken)
{
     var cancellationTokenSource = new CancellationTokenSource();

     await Parallel.ForEachAsync(myList, async (objectId, _) =>
     {
         var result = await _service.GetObject(objectId);

         if (result.Any())
         {
             cancellationTokenSource.Cancel();
         }
     });

     if (cancellationTokenSource.IsCancellationRequested) return Result.Fail("Error message.");

     return Result.Ok();
}
Run Code Online (Sandbox Code Playgroud)

这可行,但想知道我CancellationTokenSource在这里使用是否正确?

.net c# async-await cancellationtokensource parallel.foreachasync

1
推荐指数
1
解决办法
61
查看次数