将 csv 文件加载到 SQL Server 2016 的最有效方法是什么?

son*_*hoc 4 sql-server c# csv sql-server-2016

好的,所以这个问题看起来很简单。好吧,它不仅仅是加载文件。

整个故事是这样的:出于某种原因,我们的客户向我们发送了只能被描述为关系数据库的内容,被压缩,压缩成单个 csv 文件(但分隔符是波浪号而不是逗号)。实际上,这有点可怕;相同的数据在整个文件中无限重复。因此,为了使这些数据恢复秩序,我将其加载到实际的关系数据库中。由于数据量很大,将其加载到数据库中可以更轻松地检查数据是否存在问题。它还使导出变得更加容易。

每条记录有 53 行,每次传输大约有 250,000 条记录。我想把它分成 6 个标准化表。我不确定是否要验证 C# 程序或我正在使用的 SQL Server 2016 LocalDb 实例中的数据。

我不是经验丰富的 DBA;我是一名 C# 程序员,对 SQL 有所涉猎。我对语法感到很舒服,但我想确保我这样做是正确的。

此外,一切都必须完全自动化。文件进来,C# 程序在收到文件时启动,并将其加载到数据库中。

让我解释一下布局。该文件有 53 个字段,每行包含一个报表明细行(他们的收费内容、项目收费的频率、该项目的总成本或贷项等)。问题是每一行都包含整个邮寄、付款人、居民、财产和汇款的信息。知道了这一点,让我解释一下我现在是如何做的:

  1. 打开文件
  2. 对于文件的每一行,检索描述邮寄、付款人、居民、财产和汇款目的地的表的键。
  3. 将该数据与缓存的数据进行比较。如果缓存的数据无效,请查询数据库以查看是否已添加该实体。如果没有,请创建它。缓存那个。
  4. 添加新的详细信息行并将其与邮件相关联,邮件与详细信息是一对多的关系。(邮寄本身与付款人、财产和汇款是多对一的关系。)
  5. 完成后关闭文件。

这不是世界上最慢的事情,但它完全是在 RAM 中完成的。由于程序在当前状态下非常接近 RAM 耗尽,我们决定将数据加载到数据库中,而不是将其全部保存在本地 RAM 中。希望这可以为潜在的回答者提供更多信息。谢谢!

Sol*_*zky 5

我首选的传递多个字段的多行(即不是简单的分隔列表)的方法是使用表值参数 (TVP)。这个想法是,您将在 .NET 代码中逐行读取文件,但将数据一次性全部流式传输,或者如果对于一个事务来说太多了,则将其分成若干批,使用 TVP 传输到 SQL Server。TVP 本质上是一个表变量,它是存储过程的输入参数,因此这将是基于集合的操作(就 SQL Server 而言),而不是逐行操作。从技术上讲,不需要存储过程,因为 TVP 可以作为参数发送到即席查询,但无论如何使用存储过程只是一种更好的方法。

在从文件读取时使用 TVP 有两种主要模式(每个模式都依赖于传入返回IEnumerable<SqlDataRecord>而不是 a 的方法DataTable):

  1. 执行“导入”存储过程,它启动文件打开和读取。所有行都被读取(并且此时可以被验证)并流入存储过程。在这种方法中,存储过程只执行一次,所有行作为一个集合发送。这是更简单的方法,但对于较大的数据集(即数百万行),如果操作是将数据直接合并到活动表中而不是简单地加载到临时表中,则它可能不会执行最佳操作。这种方法所需的内存是 1 条记录的大小。

  2. 创建一个变量int _BatchSize,打开文件,然后:

    1. 创建一个集合来保存这批记录
      1. 循环 for _BatchSizeor 直到没有更多的行可以从文件中读取
        1. 读一行
        2. 证实
        3. 在集合中存储有效条目
      2. 在每个循环结束时执行存储过程,在集合中流式传输。
      3. 这种方法所需的内存是 1 条记录 * 的大小_BatchSize
      4. 好处是数据库中的事务不依赖于任何磁盘 I/O 延迟或业务逻辑延迟。
    2. 循环执行存储过程,直到没有更多行可以从文件中读取
      1. 执行存储过程
        1. 循环 for _BatchSizeor 直到没有更多的行可以从文件中读取
          1. 读一行
          2. 证实
          3. 将记录流式传输到 SQL Server
      2. 这种方法所需的内存是 1 条记录的大小。
      3. 缺点是数据库中的事务取决于磁盘 I/O 延迟和/或业务逻辑延迟,因此可能打开更长时间,因此更有可能发生阻塞。

我在以下答案中提供了 StackOverflow 上模式 #1 的完整示例:如何在最短的时间内插入 1000 万条记录?

对于模式#2.1(一种高度可扩展的方法),我在下面有一个部分示例:

所需的数据库对象(使用人为的结构):

首先,您需要一个用户定义的表类型 (UDTT)。

请注意UNIQUE,在记录到达 SQL Server 之前DEFAULT,使用、 和CHECK约束来强制执行数据完整性。唯一约束也是您在表变量上创建索引的方式:)。

CREATE TYPE [ImportStructure] AS TABLE
(
    BatchRecordID INT IDENTITY(1, 1) NOT NULL,
    Name NVARCHAR(200) NOT NULL,
    SKU VARCHAR(50) NOT NULL UNIQUE,
    LaunchDate DATETIME NULL,
    Quantity INT NOT NULL DEFAULT (0),
    CHECK ([Quantity] >= 0)
);
GO
Run Code Online (Sandbox Code Playgroud)

接下来,使用 UDTT 作为导入存储过程的输入参数(因此称为“表值参数”)。

CREATE PROCEDURE dbo.ImportData (
   @CustomerID     INT,
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

UPDATE prod
SET    prod.[Name] = imp.[Name],
       prod.[LaunchDate] = imp.[LaunchDate],
       prod.[Quantity] = imp.[Quantity]
FROM   [Inventory].[Products] prod
INNER JOIN @ImportTable imp
        ON imp.[SKU] = prod.[SKU]
WHERE  prod.CustomerID = @CustomerID;

INSERT INTO [Inventory].[Products] ([CustomerID], [SKU], [Name], [LaunchDate], [Quantity])
    SELECT  @CustomerID, [SKU], [Name], [LaunchDate], [Quantity]
    FROM    @ImportTable imp
    WHERE   NOT EXISTS (SELECT prod.[SKU]
                        FROM   [Inventory].[Products] prod
                        WHERE  prod.[SKU] = imp.[SKU]
                       );
GO
Run Code Online (Sandbox Code Playgroud)

应用代码:

首先,我们将定义将用于存储批记录的类:

using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private class ImportBatch
{
  string Name;
  string SKU;
  DateTime LaunchDate;
  int Quantity;
}
Run Code Online (Sandbox Code Playgroud)

接下来,我们定义用于将数据从集合流式传输到 SQL Server 的方法。请注意:

  • SqlMetaData入境BatchRecordID,即使它定义的是一个IDENTITY领域。但是,它的定义方式表明该值是服务器生成的。
  • yield return返回的记录,但随后控制是右后卫到下一行(这只是又回到了循环的顶部)。
private static IEnumerable<SqlDataRecord> SendImportBatch(List<ImportBatch> RecordsToSend)
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("BatchRecordID", SqlDbType.Int, true, false,
                      SortOrder.Unspecified, -1),
      new SqlMetaData("Name", SqlDbType.NVarChar, 200),
      new SqlMetaData("SKU", SqlDbType.VarChar, 50),
      new SqlMetaData("LaunchDate", SqlDbType.DateTime),
      new SqlMetaData("Quantity", SqlDbType.Int)
   };

   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);

   // Stream the collection into SQL Server without first
   // copying it into a DataTable.
   foreach (ImportBatch _RecordToSend in RecordsToSend)
   {
      // we don't set field 0 as that is the IDENTITY field
      _DataRecord.SetString(1, _RecordToSend.Name);
      _DataRecord.SetString(2, _RecordToSend.SKU);
      _DataRecord.SetDateTime(3, _RecordToSend.LaunchDate);
      _DataRecord.SetInt32(4, _RecordToSend.Quantity);

      yield return _DataRecord;
   }
}
Run Code Online (Sandbox Code Playgroud)

最后,我们定义了整体的导入处理操作。它打开到 SQL Server 的连接和文件,然后循环读取文件并验证_BatchSize每个循环的记录数。存储过程参数只定义一次,因为它们不会改变:CustomerID值不会改变,并且 TVP 参数值只是对方法的引用SendImportBatch——只有在通过 执行存储过程时才会被调用ExecuteNonQuery。作为 TVP 值传入的方法的输入参数是引用类型,因此它应始终反映该变量/对象的当前值。

public static void ProcessImport(int CustomerID)
{
   int _BatchSize = GetBatchSize();
   string _ImportFilePath = GetImportFileForCustomer(CustomerID);

   List<ImportBatch> _CurrentBatch = new List<ImportBatch>();
   ImportBatch _CurrentRecord;

   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   // Parameters do not require leading "@" when using CommandType.StoredProcedure
   SqlParameter _ParamCustomerID = new SqlParameter("CustomerID", SqlDbType.Int);
   _ParamCustomerID.Value = CustomerID;
   _Command.Parameters.Add(_ParamCustomerID);

   SqlParameter _ParamImportTbl = new SqlParameter("ImportTable", SqlDbType.Structured);
   // TypeName is not needed when using CommandType.StoredProcedure
   //_ParamImportTbl.TypeName = "dbo.ImportStructure";
   // Parameter value is method that returns streamed data (IEnumerable)
   _ParamImportTbl.Value = SendImportBatch(_CurrentBatch);
   _Command.Parameters.Add(_ParamImportTbl);

   StreamReader _FileReader = null;

   try
   {
      int _RecordCount;
      string[] _InputLine = new string[4];

      _Connection.Open();

      _FileReader = new StreamReader(_ImportFilePath);

       // process the file
       while (!_FileReader.EndOfStream)
       {
          _RecordCount = 1;

          // process a batch
          while (_RecordCount <= _BatchSize
                  && !_FileReader.EndOfStream)
          {
             _CurrentRecord = new ImportBatch();

             _InputLine = _FileReader.ReadLine().Split(new char[]{','});

             _CurrentRecord.Name = _InputLine[0];
             _CurrentRecord.SKU = _InputLine[1];
             _CurrentRecord.LaunchDate = DateTime.Parse(_InputLine[2]);
             _CurrentRecord.Quantity = Int32.Parse(_InputLine[3]);

             // Do validations, transformations, etc
             if (record is not valid)
             {
                _CurrentRecord = null;
                continue; // skip to next line in the file
             }

             _CurrentBatch.Add(_CurrentRecord);
             _RecordCount++; // only increment for valid records
          }

          _Command.ExecuteNonQuery(); // send batch to SQL Server

          _CurrentBatch.Clear();
       }
   }
   finally
   {
      _FileReader.Close();

      _Connection.Close();
   }

   return;
}
Run Code Online (Sandbox Code Playgroud)

如果READONLYTVP的性质禁止在合并到目标表之前需要进行某些验证和/或转换,则可以在存储过程开始时轻松地将 TVP 中的数据传输到本地临时表。