Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync<T>() 的意外行为

And*_*rew 5 c# entity-framework-core ef-core-2.0 ef-core-2.1

以下是重现的步骤。下面的程序使用 .Net Core 控制台应用程序和 EF Core 将 10,000 行从一个 SQL 表复制到另一个。该程序分 100 批插入记录,并且(这很重要!)它为每个插入创建一个新的 DbContext 实例。

1) 创建 SQL Server 数据库,以及“Froms”和“Tos”表:

create table Froms (
    Id int identity(1, 1) not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Froms] primary key clustered (Id asc)
)
go

create table Tos (
    Id int not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Tos] primary key clustered (Id asc)
)
go
Run Code Online (Sandbox Code Playgroud)

2) 填充“Froms”表:

set nocount on
declare @i int = 0

while @i < 10000
begin
    insert Froms (Guid)
    values (newid())

    set @i += 1
end
go
Run Code Online (Sandbox Code Playgroud)

3) 创建名为 .Net Core 控制台应用程序项目TestForEachAsync。将 C# 的版本更改为 7.1 或更高版本(需要用于async Main)。添加Microsoft.EntityFrameworkCore.SqlServernuget包。

4)创建类:

数据库实体

using System;

namespace TestForEachAsync
{
    public class From
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}
Run Code Online (Sandbox Code Playgroud)
using System;

namespace TestForEachAsync
{
    public class To
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}
Run Code Online (Sandbox Code Playgroud)

数据库上下文

using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    public class Context : DbContext
    {
        public DbSet<From> Froms { get; set; }
        public DbSet<To> Tos { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer("YOUR_CONNECTION_STRING");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

主要的

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            //Get the "froms"
            var selectContext = new Context();
            var froms = selectContext.Froms.Select(f => new { f.Id, f.Guid });

            int count = 0;
            Task<int> saveChangesTask = null;
            Context insertContext = new Context();
            Context prevInsertContext = null;

            //Iterate through "froms"
            await froms.ForEachAsync(
                async f =>
                {
                    //Add instace of "to" to the context
                    var to = new To { Id = f.Id, Guid = f.Guid };
                    await insertContext.Tos.AddAsync(to);
                    count++;

                    //If another 100 of "to"s has been added to the context...
                    if (count % 100 == 0)
                    {
                        //Wait for the previous 100 "to"s to finish saving to the database
                        if (saveChangesTask != null)
                        {
                            await saveChangesTask;
                        }

                        //Start saving the next 100 "to"s
                        saveChangesTask = insertContext.SaveChangesAsync();

                        //Dispose of the context that was used to save previous 100 "to"s
                        prevInsertContext?.Dispose();

                        //Reassign the context used to save the current 100 "to"s to a "prev" variable,
                        //and set context variable to the new Context instance.
                        prevInsertContext = insertContext;
                        insertContext = new Context();
                    }
                }
            );

            //Wait for second last 100 "to"s to finish saving to the database
            if (saveChangesTask != null)
            {
                await saveChangesTask;
            }

            //Save the last 100 "to"s to the database
            await insertContext.SaveChangesAsync();
            insertContext.Dispose();

            Console.WriteLine("Done");
            Console.ReadKey();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

5)运行应用程序 - 你得到一个例外The connection does not support MultipleActiveResultSets。看起来正在启动多个操作insertContext,但我不明白为什么。

6)我找到了两种解决问题的方法:

  • await froms.ForEachAsync(...)用“正常”循环替换循环foreach (var f in froms) {...},或
  • 在异步循环内,替换await saveChangesTask;saveChangesTask.Wait();

但是有人可以解释一下为什么原始代码不能按我的预期工作吗?

注意:如果您多次运行该应用程序,请不要忘记在每次运行前截断“Tos”表。

Iva*_*oev 5

您正陷入将异步 lambda 传递给期望委托返回 void(Action<T>在这种特殊情况下)的方法的典型陷阱,如Stephen Toub传递异步 lambdas 时要避免的潜在陷阱中所述。这实际上相当于使用async void它的陷阱,因为您的异步代码根本没有 - awaited,从而破坏了它的内部逻辑。

解决方案和往常一样是一个特殊的重载,它接受Func<T, Task>而不是Action<T>. 可能它应该由 EF Core 提供(你可以考虑发布一个请求),但现在你可以用这样的东西自己实现它:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Extensions.Internal;

namespace Microsoft.EntityFrameworkCore
{
    public static class AsyncExtensions
    {
        public static Task ForEachAsync<T>(this IQueryable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) =>
            source.AsAsyncEnumerable().ForEachAsync(action, cancellationToken);

        public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
        {
            using (var asyncEnumerator = source.GetEnumerator())
                while (await asyncEnumerator.MoveNext(cancellationToken))
                    await action(asyncEnumerator.Current);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这基本上是EF Core 实现,添加awaitaction.

一旦你这样做了,你的代码将解析为这个方法,一切都应该按预期工作。