如何在最短的时间内插入1000万条记录?

MD *_*AIN 26 c# sql-server import bulkinsert table-valued-parameters

我有一个文件(有1000万条记录),如下所示:

    line1
    line2
    line3
    line4
   .......
    ......
    10 million lines
Run Code Online (Sandbox Code Playgroud)

所以基本上我想在数据库中插入1000万条记录.所以我读了文件并将其上传到SQL Server.

C#代码

System.IO.StreamReader file = 
    new System.IO.StreamReader(@"c:\test.txt");
while((line = file.ReadLine()) != null)
{
    // insertion code goes here
    //DAL.ExecuteSql("insert into table1 values("+line+")");
}

file.Close();
Run Code Online (Sandbox Code Playgroud)

但插入需要很长时间.如何使用C#在尽可能短的时间内插入1000万条记录?

更新1:
批量插入:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(

     ROWTERMINATOR =' \n'
  );
Run Code Online (Sandbox Code Playgroud)

我的表如下:

DATAs
(
     DatasField VARCHAR(MAX)
)
Run Code Online (Sandbox Code Playgroud)

但我得到以下错误:

Msg 4866,Level 16,State 1,Line 1
批量加载失败.第1行第1列的数据文件中的列太长.验证是否正确指定了字段终止符和行终止符.

消息7399,级别16,状态1,行1
链接服务器"(null)"的OLE DB提供程序"BULK"报告错误.提供商未提供有关错误的任何信息.

消息7330,级别16,状态2,行1
无法从OLE DB提供程序"BULK"获取链接服务器"(null)"的行.

下面的代码工作:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(
    FIELDTERMINATOR = '\t',
    ROWTERMINATOR = '\n'
);
Run Code Online (Sandbox Code Playgroud)

Sol*_*zky 42

请不要创建一个DataTable通过BulkCopy加载.对于较小的数据集,这是一个很好的解决方案,但在调用数据库之前,绝对没有理由将所有1000万行加载到内存中.

您最好的选择(在BCP/ BULK INSERT/ 之外OPENROWSET(BULK...))是通过表值参数(TVP)将文件中的内容流式传输到数据库中.通过使用TVP,您可以打开文件,读取行并发送一行直到完成,然后关闭文件.此方法的内存占用量仅为一行.我写了一篇文章,从一个应用程序中将数据流式传输到SQL Server 2008,这个例子就是这个场景.

结构的简单概述如下.我假设相同的导入表和字段名称,如上面的问题所示.

必需的数据库对象

-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO

-- Second: Use the UDTT as an input param to an import proc.
--         Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;

INSERT INTO dbo.DATAs (DatasField)
    SELECT  Field
    FROM    @ImportTable;

GO
Run Code Online (Sandbox Code Playgroud)

使用上述SQL对象的C#app代码如下.请注意如何填充对象(例如DataTable)然后执行存储过程,而在此方法中,执行存储过程会启动文件内容的读取.存储过程的输入参数不是变量; 它是方法的返回值,GetFileContents.当打开文件的SqlCommand调用ExecuteNonQuery读取一行并通过IEnumerable<SqlDataRecord>yield return构造将行发送到SQL Server 时,将调用该方法,然后关闭该文件.存储过程只看到一个表变量@ImportTable,一旦数据开始过来就可以访问(注意:数据确实会持续很短的时间,即使不是tempdb中的完整内容).

using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private static IEnumerable<SqlDataRecord> GetFileContents()
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
   };
   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
   StreamReader _FileReader = null;

   try
   {
      _FileReader = new StreamReader("{filePath}");

      // read a row, send a row
      while (!_FileReader.EndOfStream)
      {
         // You shouldn't need to call "_DataRecord = new SqlDataRecord" as
         // SQL Server already received the row when "yield return" was called.
         // Unlike BCP and BULK INSERT, you have the option here to create a string
         // call ReadLine() into the string, do manipulation(s) / validation(s) on
         // the string, then pass that string into SetString() or discard if invalid.
         _DataRecord.SetString(0, _FileReader.ReadLine());
         yield return _DataRecord;
      }
   }
   finally
   {
      _FileReader.Close();
   }
}
Run Code Online (Sandbox Code Playgroud)

上述GetFileContents方法用作存储过程的输入参数值,如下所示:

public static void test()
{
   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   SqlParameter _TVParam = new SqlParameter();
   _TVParam.ParameterName = "@ImportTable";
   _TVParam.TypeName = "dbo.ImportStructure";
   _TVParam.SqlDbType = SqlDbType.Structured;
   _TVParam.Value = GetFileContents(); // return value of the method is streamed data
   _Command.Parameters.Add(_TVParam);

   try
   {
      _Connection.Open();

      _Command.ExecuteNonQuery();
   }
   finally
   {
      _Connection.Close();
   }

   return;
}
Run Code Online (Sandbox Code Playgroud)

补充说明:

  1. 通过一些修改,上面的C#代码可以适用于批量处理数据.
  2. 通过微小的修改,上面的C#代码可以适用于发送多个字段("Steaming Data ..."文章中显示的示例链接在2个字段中的上面传递).
  3. 您还可以SELECT在proc 中的语句中操作每个记录的值.
  4. 您还可以使用proc中的WHERE条件过滤掉行.
  5. 您可以多次访问TVP表变量; 它是READONLY但不是"仅向前".
  6. 优点SqlBulkCopy:
    1. SqlBulkCopy仅使用INSERT,而使用TVP可以以任何方式使用数据:您可以调用MERGE; 你可以DELETE根据某些条件; 您可以将数据拆分为多个表; 等等.
    2. 由于TVP不是INSERT,因此您不需要单独的临时表来转储数据.
    3. 您可以通过调用ExecuteReader而不是从数据库中获取数据ExecuteNonQuery.例如,如果导入表IDENTITY上有一个字段,则DATAs可以OUTPUTINSERT要传递的子句添加一个子句INSERTED.[ID](假设IDIDENTITY字段的名称).或者您可以传回完全不同的查询结果,或两者都可以,因为可以通过发送和访问多个结果集Reader.NextResult().在使用时无法从数据库中获取信息,SqlBulkCopy但是在这里有几个问题想要做到这一点的人(至少关于新创建的IDENTITY值).
    4. 有关为什么整个过程有时更快的更多信息,即使从磁盘获取数据到SQL Server的速度稍慢,请参阅SQL Server客户咨询团队的白皮书:使用TVP最大化吞吐量


pla*_*ful 6

在C#中,最好的解决方案是让SqlBulkCopy文件读取.为此,您需要IDataReader直接传递给SqlBulkCopy.WriteToServer方法.这是一个例子:http://www.codeproject.com/Articles/228332/IDataReader-implementation-plus-SqlBulkCopy

  • @srutzky假设所提出的问题是"如何在最短的时间内插入1000万条[C#]?".我提出的解决方案是最符合问题的解决方案,换句话说,它是提出的问题的最佳解决方案.因为,正如你所说:SqlBulkCopy只允许将数据插入表中 - 这正是问题范围. (3认同)