使用Dapper,批量插入的时间比预期的要长

ken*_*ner 64 performance sqlbulkcopy dapper

阅读本文后,我决定仔细研究一下我使用Dapper的方式.

我在一个空数据库上运行此代码

var members = new List<Member>();
for (int i = 0; i < 50000; i++)
{
    members.Add(new Member()
    {
        Username = i.toString(),
        IsActive = true
    });
}

using (var scope = new TransactionScope())
{
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}
Run Code Online (Sandbox Code Playgroud)

花了大约20秒钟.这是2500插入/秒.不错,但考虑到博客达到45k插入/秒,也不是很好.在Dapper中有更有效的方法吗?

另外,作为旁注,通过Visual Studio调试器运行此代码需要3分钟!我认为调试器会慢一点,但我很惊讶地看到了这么多.

UPDATE

所以这

using (var scope = new TransactionScope())
{
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}
Run Code Online (Sandbox Code Playgroud)

还有这个

    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);
Run Code Online (Sandbox Code Playgroud)

都花了20秒.

但这需要4秒!

SqlTransaction trans = connection.BeginTransaction();

connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);

trans.Commit();
Run Code Online (Sandbox Code Playgroud)

ken*_*ner 72

使用这种方法,我能够在4秒内获得的最佳记录是50k记录

SqlTransaction trans = connection.BeginTransaction();

connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);

trans.Commit();
Run Code Online (Sandbox Code Playgroud)

  • @Parhs:这是50%的改进.</迂腐> (11认同)
  • @chester89 我同意你的看法。这不是批量插入任何东西。这只是常规插入。我不认为 dapper 在这一点上可以处理批量插入 (3认同)
  • 伙计们-我只是看着sql server profiler而感到困惑。此代码不进行批处理(至少对我而言)-每次操作插入一行(表示1500行-1500个单独的插入命令)。我将Sql Server 2012与最新的dapper一起使用 (2认同)

Fre*_*ung 11

我最近偶然发现了这一点,并注意到在连接打开后创建了TransactionScope(我假设这是因为Dappers Execute没有打开连接,这与Query不同).根据答案Q4:https://stackoverflow.com/a/2886326/455904,不会导致TransactionScope处理连接.我的同事做了一些快速测试,在TransactionScope之外打开连接大大降低了性能.

因此,更改为以下内容应该有效:

// Assuming the connection isn't already open
using (var scope = new TransactionScope())
{
    connection.Open();
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}
Run Code Online (Sandbox Code Playgroud)

  • 如果你尝试`ExecuteAsync`,这将抛出一个异常:"一个TransactionScope必须放在它创建的同一个线程上".为了避免这种情况:`using(var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))` (4认同)

Cal*_*ass 6

我创建了一个扩展方法,可以让您非常快速地进行批量插入。

public static class DapperExtensions
{
    public static async Task BulkInsert<T>(
        this IDbConnection connection,
        string tableName,
        IReadOnlyCollection<T> items,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        const int MaxBatchSize = 1000;
        const int MaxParameterSize = 2000;

        var batchSize = Math.Min((int)Math.Ceiling((double)MaxParameterSize / dataFunc.Keys.Count), MaxBatchSize);
        var numberOfBatches = (int)Math.Ceiling((double)items.Count / batchSize);
        var columnNames = dataFunc.Keys;
        var insertSql = $"INSERT INTO {tableName} ({string.Join(", ", columnNames.Select(e => $"[{e}]"))}) VALUES ";
        var sqlToExecute = new List<Tuple<string, DynamicParameters>>();

        for (var i = 0; i < numberOfBatches; i++)
        {
            var dataToInsert = items.Skip(i * batchSize)
                .Take(batchSize);
            var valueSql = GetQueries(dataToInsert, dataFunc);

            sqlToExecute.Add(Tuple.Create($"{insertSql}{string.Join(", ", valueSql.Item1)}", valueSql.Item2));
        }

        foreach (var sql in sqlToExecute)
        {
            await connection.ExecuteAsync(sql.Item1, sql.Item2, commandTimeout: int.MaxValue);
        }
    }

    private static Tuple<IEnumerable<string>, DynamicParameters> GetQueries<T>(
        IEnumerable<T> dataToInsert,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        var parameters = new DynamicParameters();

        return Tuple.Create(
            dataToInsert.Select(e => $"({string.Join(", ", GenerateQueryAndParameters(e, parameters, dataFunc))})"),
            parameters);
    }

    private static IEnumerable<string> GenerateQueryAndParameters<T>(
        T entity,
        DynamicParameters parameters,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        var paramTemplateFunc = new Func<Guid, string>(guid => $"@p{guid.ToString().Replace("-", "")}");
        var paramList = new List<string>();

        foreach (var key in dataFunc)
        {
            var paramName = paramTemplateFunc(Guid.NewGuid());
            parameters.Add(paramName, key.Value(entity));
            paramList.Add(paramName);
        }

        return paramList;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后要使用此扩展方法,您将编写如下代码:

await dbConnection.BulkInsert(
    "MySchemaName.MyTableName",
    myCollectionOfItems,
    new Dictionary<string, Func<MyObjectToInsert, object>>
        {
            { "ColumnOne", u => u.ColumnOne },
            { "ColumnTwo", u => u.ColumnTwo },
            ...
        });
Run Code Online (Sandbox Code Playgroud)

这是相当原始的,并且还有进一步改进的空间,例如传入事务或 commandTimeout 值,但它对我来说很有效。