我正在尝试运行 a Parallel.ForEachAsync(),但出现以下两个错误:
错误 1:
参数 2:无法从 System.Threading.Tasks.ParallelOptions 转换为 System.Threading.CancellationToken
错误 2:
Delegate Func<WC, CancellationToken, ValueTask> 不采用 1 个参数
这是我的代码:
public async Task<List<DisplayDto>> GetData()
{
var options = new ParallelOptions()
{
MaxDegreeOfParallelism = 20;
};
await Parallel.ForEachAsync(result, options, async OrderNumber => {
//Do Stuff here.
});
}
Run Code Online (Sandbox Code Playgroud)
为了使其按照我的要求工作,必须改变什么?
在 C# 中,我对停止循环感兴趣Parallel.ForEachAsync(考虑和之间的差异StopBreak);因为Parallel.ForEach我可以执行以下操作:
Parallel.ForEach(items, (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
state.Stop();
return;
}
// some process on the item
Process(item);
});
Run Code Online (Sandbox Code Playgroud)
但是,由于我有一个需要异步执行的进程,所以我切换到了Parallel.ForEachAsync. ForEachAsync没有该方法Stop(),我可以按break如下方式循环,但我想知道这是否是打破循环的最有效方法(换句话说,循环在收到取消时需要尽快停止要求)。
await Parallel.ForEachAsync(items, async (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// some async process on the item
await ProcessAsync(item);
});
Run Code Online (Sandbox Code Playgroud) c# asynchronous cancellation parallel.foreach parallel.foreachasync
假设我想发出并行 API post 请求。
在 for 循环中,我可以将 http post 调用附加到任务列表中(使用 Task.Run 调用的每个任务),然后等待所有任务完成使用await Task.WhenAll. 因此,在等待网络请求完成时,控制权将交给调用者。实际上,API 请求将并行发出。
同样,我可以使用Parallel.ForEachAsync它将自动执行WhenAll并将控制权返回给调用者。所以我想问是否ForEachAsync可以替换普通的 for 循环列表(async wait Task.Run)和WhenAll?
在 .NET 5 中,Parallel.ForEach您可以使用ParallelLoopState.Break()方法来停止处理的额外迭代。允许当前的完成处理。
但是新的 .NET 6Parallel.ForEachAsync没有这个ParallelLoopState类,所以我们不能像使用Parallel.ForEach. 那么有没有办法在 中执行相同的中断功能ForEachAsync?CancellationToken传递给 func 我不认为这是正确的方法,因为您没有尝试取消正在运行的循环,而是阻止额外的迭代开始。
类似于此功能,但对于异步版本:
int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
(file, state) =>
{
Interlocked.Increment(ref count);
if (count >= MaxFilesToProcess)
{
state.Break();
}
...
Run Code Online (Sandbox Code Playgroud)
作为一种解决方法,我可能可以.Take([xx])在将其传递到并行循环之前使用它TSource,但这可能不是打破复杂条件的选项。
我正在尝试如何打破循环ForEachAsync。break不起作用,但我可以调用CancelCancellationTokenSource。的签名ForEachAsync有两个标记 - 一个作为独立参数,另一个在Func主体签名中。
我注意到,当cts.Cancel()调用时,token和t变量都IsCancellationRequested设置为 true。所以,我的问题是:这两个单独的论点的目的是什么token?有什么值得注意的区别吗?
List<string> symbols = new() { "A", "B", "C" };
var cts = new CancellationTokenSource();
var token = cts.Token;
token.ThrowIfCancellationRequested();
try
{
await Parallel.ForEachAsync(symbols, token, async (symbol, t) =>
{
if (await someConditionAsync())
{
cts.Cancel();
}
});
catch (OperationCanceledException oce)
{
Console.WriteLine($"Stopping parallel loop: {oce}");
}
finally
{
cts.Dispose();
}
Run Code Online (Sandbox Code Playgroud) 在 .NET 6 项目中,我必须调用一个偏移分页(页/每页)的 Web API,并且我希望尽可能使 n 个调用并行。
这是使用给定页码调用 API 一次的方法:
private Task<ApiResponse> CallApiAsync(int page,
CancellationToken cancellationToken = default)
{
return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
.ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)
我实际上需要的是从第 1 页到第 n 页的所有 API 调用的仅前向流式迭代器,因此考虑到这一要求,我认为这IAsyncEnumerable是正确的 API,这样我就可以并行触发 API 调用并访问每个 API 响应一旦准备好,就可以完成,而不需要全部完成。
所以我想出了以下代码:
public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
int numProducts = GetNumberOfProducts(perPage);
int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
var pages = Enumerable.Range(1, numCalls);
Parallel.ForEach(pages, async page => {
yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
});
yield break;
}
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误 …
c# parallel.foreach .net-core iasyncenumerable parallel.foreachasync
我想在 C# 中异步并行执行 for 循环。但该类Parallel仅包含一个同步For方法和一个异步ForEachAsync方法(.NET 6)。这对我来说似乎是一个疏忽。
方法在哪里Parallel.ForAsync?有什么解决方法吗?
该财产的文件ParallelOptions.MaxDegreeOfParallelism指出:
该属性会影响传递此实例的方法调用
MaxDegreeOfParallelism运行的并发操作数。正的属性值将并发操作的数量限制为设定值。如果为-1,则并发运行的操作数没有限制。ParallelParallelOptions默认情况下,
For将ForEach利用底层调度程序提供的线程数量,因此更改MaxDegreeOfParallelism默认值只会限制将使用的并发任务数量。
我试图理解“无限制”在这种情况下意味着什么。根据以上文档摘录,我的期望是Parallel.Invoke配置的操作MaxDegreeOfParallelism = -1将立即开始并行执行所有提供的actions. 但事实并非如此。这是一个包含 12 个操作的实验:
int concurrency = 0;
Action action = new Action(() =>
{
var current = Interlocked.Increment(ref concurrency);
Console.WriteLine(@$"Started an action at {DateTime
.Now:HH:mm:ss.fff} on thread #{Thread
.CurrentThread.ManagedThreadId} with concurrency {current}");
Thread.Sleep(1000);
Interlocked.Decrement(ref concurrency);
});
Action[] actions = Enumerable.Repeat(action, 12).ToArray();
var options = new ParallelOptions() { MaxDegreeOfParallelism = -1 };
Parallel.Invoke(options, …Run Code Online (Sandbox Code Playgroud) c# task-parallel-library parallel.foreach .net-6.0 parallel.foreachasync
我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......
斯蒂芬的扩展:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Run Code Online (Sandbox Code Playgroud)
我的方法(不起作用;任务被执行但结果是错误的)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only …Run Code Online (Sandbox Code Playgroud) c# concurrency asynchronous task-parallel-library parallel.foreachasync
据我了解,Parallel.ForeachAsync 调用中的委托会同时执行多次。
如果该委托操作的变量不是该委托的本地变量怎么办?
假设我在委托中增加了一个静态计数器。我需要通过互斥锁或其他东西来保护计数器吗?
拥有这个处理程序:
public async Task<Result> Handle(MyQuery request, CancellationToken cancellationToken)
{
var cancellationTokenSource = new CancellationTokenSource();
await Parallel.ForEachAsync(myList, async (objectId, _) =>
{
var result = await _service.GetObject(objectId);
if (result.Any())
{
cancellationTokenSource.Cancel();
}
});
if (cancellationTokenSource.IsCancellationRequested) return Result.Fail("Error message.");
return Result.Ok();
}
Run Code Online (Sandbox Code Playgroud)
这可行,但想知道我CancellationTokenSource在这里使用是否正确?
.net c# async-await cancellationtokensource parallel.foreachasync
c# ×11
asynchronous ×7
async-await ×3
.net ×2
.net-6.0 ×2
cancellation ×2
concurrency ×2
.net-core ×1
mutex ×1