MD *_*AIN 26 c# sql-server import bulkinsert table-valued-parameters
我有一个文件(有1000万条记录),如下所示:
line1
line2
line3
line4
.......
......
10 million lines
Run Code Online (Sandbox Code Playgroud)
所以基本上我想在数据库中插入1000万条记录.所以我读了文件并将其上传到SQL Server.
C#代码
System.IO.StreamReader file =
new System.IO.StreamReader(@"c:\test.txt");
while((line = file.ReadLine()) != null)
{
// insertion code goes here
//DAL.ExecuteSql("insert into table1 values("+line+")");
}
file.Close();
Run Code Online (Sandbox Code Playgroud)
但插入需要很长时间.如何使用C#在尽可能短的时间内插入1000万条记录?
更新1:
批量插入:
BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(
ROWTERMINATOR =' \n'
);
Run Code Online (Sandbox Code Playgroud)
我的表如下:
DATAs
(
DatasField VARCHAR(MAX)
)
Run Code Online (Sandbox Code Playgroud)
但我得到以下错误:
Msg 4866,Level 16,State 1,Line 1
批量加载失败.第1行第1列的数据文件中的列太长.验证是否正确指定了字段终止符和行终止符.消息7399,级别16,状态1,行1
链接服务器"(null)"的OLE DB提供程序"BULK"报告错误.提供商未提供有关错误的任何信息.消息7330,级别16,状态2,行1
无法从OLE DB提供程序"BULK"获取链接服务器"(null)"的行.
下面的代码工作:
BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(
FIELDTERMINATOR = '\t',
ROWTERMINATOR = '\n'
);
Run Code Online (Sandbox Code Playgroud)
Sol*_*zky 42
请不要不创建一个DataTable通过BulkCopy加载.对于较小的数据集,这是一个很好的解决方案,但在调用数据库之前,绝对没有理由将所有1000万行加载到内存中.
您最好的选择(在BCP/ BULK INSERT/ 之外OPENROWSET(BULK...))是通过表值参数(TVP)将文件中的内容流式传输到数据库中.通过使用TVP,您可以打开文件,读取行并发送一行直到完成,然后关闭文件.此方法的内存占用量仅为一行.我写了一篇文章,从一个应用程序中将数据流式传输到SQL Server 2008,这个例子就是这个场景.
结构的简单概述如下.我假设相同的导入表和字段名称,如上面的问题所示.
必需的数据库对象
-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO
-- Second: Use the UDTT as an input param to an import proc.
-- Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
@ImportTable dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;
-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;
INSERT INTO dbo.DATAs (DatasField)
SELECT Field
FROM @ImportTable;
GO
Run Code Online (Sandbox Code Playgroud)
使用上述SQL对象的C#app代码如下.请注意如何填充对象(例如DataTable)然后执行存储过程,而在此方法中,执行存储过程会启动文件内容的读取.存储过程的输入参数不是变量; 它是方法的返回值,GetFileContents.当打开文件的SqlCommand调用ExecuteNonQuery读取一行并通过IEnumerable<SqlDataRecord>和yield return构造将行发送到SQL Server 时,将调用该方法,然后关闭该文件.存储过程只看到一个表变量@ImportTable,一旦数据开始过来就可以访问(注意:数据确实会持续很短的时间,即使不是tempdb中的完整内容).
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;
private static IEnumerable<SqlDataRecord> GetFileContents()
{
SqlMetaData[] _TvpSchema = new SqlMetaData[] {
new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
};
SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
StreamReader _FileReader = null;
try
{
_FileReader = new StreamReader("{filePath}");
// read a row, send a row
while (!_FileReader.EndOfStream)
{
// You shouldn't need to call "_DataRecord = new SqlDataRecord" as
// SQL Server already received the row when "yield return" was called.
// Unlike BCP and BULK INSERT, you have the option here to create a string
// call ReadLine() into the string, do manipulation(s) / validation(s) on
// the string, then pass that string into SetString() or discard if invalid.
_DataRecord.SetString(0, _FileReader.ReadLine());
yield return _DataRecord;
}
}
finally
{
_FileReader.Close();
}
}
Run Code Online (Sandbox Code Playgroud)
上述GetFileContents方法用作存储过程的输入参数值,如下所示:
public static void test()
{
SqlConnection _Connection = new SqlConnection("{connection string}");
SqlCommand _Command = new SqlCommand("ImportData", _Connection);
_Command.CommandType = CommandType.StoredProcedure;
SqlParameter _TVParam = new SqlParameter();
_TVParam.ParameterName = "@ImportTable";
_TVParam.TypeName = "dbo.ImportStructure";
_TVParam.SqlDbType = SqlDbType.Structured;
_TVParam.Value = GetFileContents(); // return value of the method is streamed data
_Command.Parameters.Add(_TVParam);
try
{
_Connection.Open();
_Command.ExecuteNonQuery();
}
finally
{
_Connection.Close();
}
return;
}
Run Code Online (Sandbox Code Playgroud)
补充说明:
SELECT在proc 中的语句中操作每个记录的值.SqlBulkCopy:
SqlBulkCopy仅使用INSERT,而使用TVP可以以任何方式使用数据:您可以调用MERGE; 你可以DELETE根据某些条件; 您可以将数据拆分为多个表; 等等.ExecuteReader而不是从数据库中获取数据ExecuteNonQuery.例如,如果导入表IDENTITY上有一个字段,则DATAs可以OUTPUT向INSERT要传递的子句添加一个子句INSERTED.[ID](假设ID是IDENTITY字段的名称).或者您可以传回完全不同的查询结果,或两者都可以,因为可以通过发送和访问多个结果集Reader.NextResult().在使用时无法从数据库中获取信息,SqlBulkCopy但是在这里有几个问题想要做到这一点的人(至少关于新创建的IDENTITY值).在C#中,最好的解决方案是让SqlBulkCopy文件读取.为此,您需要IDataReader直接传递给SqlBulkCopy.WriteToServer方法.这是一个例子:http://www.codeproject.com/Articles/228332/IDataReader-implementation-plus-SqlBulkCopy