Parallel.ForEach 中的多个异步等待链接

Sou*_*osh 4 c# asynchronous task-parallel-library async-await parallel.foreach

我有一个 Parallel.ForEach 循环,它循环遍历一个集合。在内部,我进行了多次网络 I/O 调用的循环。我使用了 Task.ContinueWith 并嵌套了后续的 async-await 调用。处理的顺序无关紧要,但每个异步调用的数据都应该以同步方式处理。含义 - 对于每次迭代,从第一个异步调用中检索到的数据应该传递给第二个异步调用。在第二个异步调用完成后,来自两个异步调用的数据应该一起处理。

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});
Run Code Online (Sandbox Code Playgroud)

我在不使用 continue await 的情况下尝试了上述操作,但它没有以同步方式工作。现在,上面的代码执行完成,但没有处理任何记录。

请问有什么帮助吗?如果我可以添加更多详细信息,请告诉我。

Joh*_*lay 7

由于您的方法涉及 I/O,它们应该被编写为真正异步的,而不仅仅是使用Task.Run.

然后你可以Task.WhenAll结合使用Enumerable.Select

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}
Run Code Online (Sandbox Code Playgroud)

这将确保各country> state>calculation顺序发生,但每个item被同时处理,和异步。


根据评论更新

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});
Run Code Online (Sandbox Code Playgroud)

信号量保证最多 2 个并发异步操作,并且取消令牌会在 10 个连续异常后取消所有未完成的任务。

这些Interlocked方法确保failures以线程安全的方式访问它。


进一步更新

使用 2 个信号量来防止多次迭代可能更有效。

将所有列表添加封装到一个方法中:

void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以允许 2 个线程同时为 Http 请求提供服务,并允许 1 个线程执行添加,从而使该操作成为线程安全的:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));
Run Code Online (Sandbox Code Playgroud)


Mar*_*ell 5

我认为你把这个问题复杂化了;在 内部Parallel.ForEach,您已经在线程池中,因此在内部创建大量附加任务确实没有任何好处。所以; 如何做到这一点实际上取决于GetStateetc 是同步还是异步。如果我们假设是同步的,那么类似于:

Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
Run Code Online (Sandbox Code Playgroud)

如果它们是异步的,就会变得更尴尬;这将是很好,如果我们可以这样做:

// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
Run Code Online (Sandbox Code Playgroud)

但这里的问题是没有一个回调Parallel.ForEach是“awaitable”的,这意味着:我们在async void这里默默地创建了一个回调,这是:非常糟糕。这意味着Parallel.ForEach一旦非完成await发生,它就会认为它已经“完成” ,这意味着:

  1. 我们不知道所有工作何时真正完成
  2. 你可能同时做的事情比你预期的要多得多(无法遵守 max-dop)

目前似乎没有任何好的 API 可以避免这种情况。

  • @SouvikGhosh oof,确实 - `Parallel.ForEach` 有“action”主体(没有 `Func&lt;T&gt;` 主体),所以:如果我们使用 `async` 版本,它将使用 `async void`,其中*真的很糟糕*;`GetCountry` 和 `GetState` 是可等待方法吗? (2认同)