Tus*_*aru 3 performance sql-server sql-server-2012
我有一个 SQL Server 2012 数据库。有一个表的行数超过 1 亿。我想将该表拆分为多个相似的表,每个表中有 100 万行。快速实现这一目标的最佳方法是什么?
我试过分页,但它花费了太多时间,有时在 SSMS 中会出现内存不足的异常。
有 12 列。11个是varchar,一个是datetime。表中没有可用的主键,也没有ID列。这是我们迁移到 SQL Server 2012 的一个非常旧的数据库表。
它的大小约为 20 GB。没有可用的索引。我不想更复杂,我只需要拆分它。
是的,即使没有任何索引,也可以将这个 1 亿行表拆分为任意数量的表。现在,您可能完全可以在 T-SQL 中执行此操作,但这需要通过事务中的单个插入语句加载每个拆分表,并且这对 tran 日志来说可能有点繁重,尤其是拆分为 200 万个表时每行。但是,由于表值参数 (TVP) 的完全流式 API 非常棒,一个小型(且相对简单)的 C#/VB.Net(或任何 .Net 语言)应用程序可以处理此问题,并且无需使用超过 1 行的内存(在应用程序端;DB 上的缓冲池就是这样),并使用最少记录的操作来做到这一点!诀窍是:
SqlParameter值设置为返回IEnumerable<SqlDataRecord>. 这将允许在SqlDataReader读取源记录的过程中逐步移动。使用这种方法,源表的大小应该无关紧要:拆分一个 10 亿行的表应该只需要更多的时间(以及复制表所需的明显磁盘空间)。第一:你需要一个用户定义的表类型
CREATE TYPE dbo.SplitTable AS TABLE
(
CreateDate DATETIME NOT NULL,
StringField01 VARCHAR(500) NULL,
StringField02 VARCHAR(500) NULL,
...
StringField10 VARCHAR(500) NULL,
StringField11 VARCHAR(500) NULL
);
GO
GRANT EXECUTE ON TYPE::[dbo].[SplitTable] TO [user_or_role];
GO
Run Code Online (Sandbox Code Playgroud)
第二:使用 UDTT 作为导入过程的输入参数。因此,“表值参数”(TVP)
-- DROP PROCEDURE dbo.ImportChunk
CREATE PROCEDURE dbo.ImportChunk (
@TableNumber INT,
@SplitTable dbo.SplitTable READONLY
)
AS
SET NOCOUNT ON;
DECLARE @SQL NVARCHAR(MAX),
@CurrentTableName sysname;
BEGIN TRY
-- Empty TVPs are not allowed, and can't send DbNull.Value when using
-- IEnumerable<SqlDataRecord>, so send in a single-row with a field to test that would
-- otherwise never have that value. Only need to test 1 row as real sets of rows
-- won't have this value so no need to test the whole set.
IF ((SELECT TOP (1) [CreateDate] FROM @SplitTable) = CONVERT(DATETIME, 0))
BEGIN
RETURN;
END;
BEGIN TRAN;
SET @CurrentTableName = N'dbo.SplitTableName_'
+ RIGHT(N'000' + CONVERT(NVARCHAR(20), @TableNumber), 4);
PRINT 'CurrentTableName: ' + @CurrentTableName;
SET @SQL = N'CREATE TABLE '
+ @CurrentTableName
+ N' (
CreateDate DATETIME NOT NULL,
StringField01 VARCHAR(500) NULL,
StringField02 VARCHAR(500) NULL,
...
StringField10 VARCHAR(500) NULL,
StringField11 VARCHAR(500) NULL
);';
EXEC(@SQL);
-- Use a SYNONYM as we are dealing with dynamic table names which requires Dynamic SQL
-- but the TVP is a table variable and cannot be referenced in Dynamic SQL. However,
-- a SYNONYM can be created in Dynamic SQL and is a consistent object name that can
-- be used in the static / main portion of this code where the TVP can be used.
-- http://msdn.microsoft.com/en-us/library/ms177544.aspx (CREATE SYNONYM)
SET @SQL = N'CREATE SYNONYM CurrentImportTable FOR '
+ @CurrentTableName;
EXEC(@SQL);
-- The WITH (TABLOCK) hint is required for this query to be considered minimally logged.
-- http://msdn.microsoft.com/en-us/library/ms174335.aspx (INSERT)
INSERT INTO CurrentImportTable WITH (TABLOCK)
(CreateDate, StringField01, StringField02, --...,
StringField10, StringField11)
SELECT tmp.CreateDate,
tmp.StringField01,
tmp.StringField02,
...
tmp.StringField10,
tmp.StringField11
FROM @SplitTable tmp;
DROP SYNONYM CurrentImportTable; -- Clean up.
COMMIT TRAN;
END TRY
BEGIN CATCH
IF (@@TRANCOUNT > 0)
BEGIN
ROLLBACK;
END;
DECLARE @Message NVARCHAR(4000);
SET @Message = ERROR_MESSAGE();
RAISERROR(@Message, 16, 1);
END CATCH;
GO
GRANT EXECUTE ON dbo.ImportData TO [user_or_role];
Run Code Online (Sandbox Code Playgroud)
第三:SqlDataReader在有问题的表上打开一个简单的选择(本质上是一个游标)。诀窍是实现一个方法,它将
然后调用上面显示的存储过程,传入循环计数器(这样我们可以适当地命名新表)和通过IEnumerable. 继续调用该过程,直到结果集中没有更多记录 / SqlDataReader。
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;
namespace TableSplitter
{
class Program
{
// these first two variables are static as their values need to be passed back to
// the main loop but iterators (i.e. returning IEnumerable) cannot have "ref" parameters
private static SqlDataReader _RowsToSend;
private static int _RowsCopied;
private static int _RowsPerSplit;
private static IEnumerable<SqlDataRecord> SendRows()
{
SqlMetaData[] _TvpSchema = new SqlMetaData[]
{
new SqlMetaData("CreateDate", SqlDbType.DateTime),
new SqlMetaData("StringField01", SqlDbType.VarChar, 500),
new SqlMetaData("StringField02", SqlDbType.VarChar, 500),
//...
new SqlMetaData("StringField10", SqlDbType.VarChar, 500),
new SqlMetaData("StringField11", SqlDbType.VarChar, 500)
};
SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
object[] _AllFields = new object[12];
// read a row, send a row
for (_RowsCopied = 0; _RowsCopied < _RowsPerSplit; _RowsCopied++)
{
if (!_RowsToSend.Read())
{
break;
}
_RowsToSend.GetValues(_AllFields);
_DataRecord.SetValues(_AllFields);
yield return _DataRecord;
}
if (_RowsCopied == 0)
{
_DataRecord.SetDateTime(0, new DateTime(1900, 1, 1));
yield return _DataRecord;
}
}
public static void SplitTable(string ConnectionString)
{
_RowsCopied = _RowsPerSplit; // seed value to enter the loop
int _TableNumber = 1;
SqlConnection _SourceConnection = new SqlConnection(ConnectionString);
SqlCommand _SourceCommand = new SqlCommand(
"ALTER DATABASE [Test] SET RECOVERY BULK_LOGGED;",
_SourceConnection);
_SourceCommand.CommandType = CommandType.Text;
SqlConnection _DestinationConnection = new SqlConnection(ConnectionString);
SqlCommand _DestinationCommand = new SqlCommand(
"dbo.ImportChunk",
_DestinationConnection);
_DestinationCommand.CommandType = CommandType.StoredProcedure;
SqlParameter _TVParam = new SqlParameter();
_TVParam.ParameterName = "@SplitTable";
_TVParam.SqlDbType = SqlDbType.Structured;
_DestinationCommand.Parameters.Add(_TVParam);
SqlParameter _TableNumParam = new SqlParameter();
_TableNumParam.ParameterName = "@TableNumber";
_TableNumParam.SqlDbType = SqlDbType.Int;
_DestinationCommand.Parameters.Add(_TableNumParam);
try
{
_SourceConnection.Open();
_SourceCommand.ExecuteNonQuery();
_SourceCommand.CommandText =
@"SELECT CreateDate, StringField01, StringField02, --...,
StringField10, StringField11 FROM dbo.TableToSplit;";
_RowsToSend = _SourceCommand.ExecuteReader();
_DestinationConnection.Open();
while (_RowsCopied == _RowsPerSplit)
{
_TableNumParam.Value = _TableNumber;
_TVParam.Value = SendRows(); // method return value is streamed data
_DestinationCommand.ExecuteNonQuery();
_TableNumber++;
}
}
catch (Exception _Exception)
{
System.Console.WriteLine("\n\n" + _Exception.Message + "\n\n");
}
finally
{
_RowsToSend.Close();
_DestinationConnection.Close();
_SourceCommand.CommandText =
"ALTER DATABASE [Test] SET RECOVERY SIMPLE;"; // or FULL
_SourceCommand.ExecuteNonQuery();
_SourceConnection.Close();
}
return;
}
static void Main(string[] args)
{
System.Diagnostics.Stopwatch _ElapsedTime = new System.Diagnostics.Stopwatch();
_ElapsedTime.Start();
_RowsPerSplit = 1000000;
SplitTable("Trusted_Connection = yes; Database = Test;");
_ElapsedTime.Stop();
System.Console.WriteLine("\n\n\tElapsed Millseconds: {0}\n\n",
_ElapsedTime.ElapsedMilliseconds);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我在我的笔记本电脑上运行了它,其中包含一个示例表:
DATETIME,其他4个是VARCHAR我运行了几次,大约需要 19 - 24 秒才能分成 3 个表,每个表有 100 万行。
| 归档时间: |
|
| 查看次数: |
14625 次 |
| 最近记录: |