如何以有效的方式在 sql server 表中插入/更新数百万行?

Mar*_*lli 5 performance sql-server insert update sql-server-2014 query-performance

通常在我的工作中,我必须在 SQL Server 中创建一个过程,该过程将处理数百万数据行,将它们保存到临时表(临时表)中,最后将它们保存到数据库中的一个或多个表中。

我不是在寻找替代解决方案,例如 SSIS。

我无法禁用索引、约束、使数据库脱机、更改恢复模式等。

我们一直在寻找将这个过程设置为在系统不那么繁忙时运行的方法,但我们在 24/7/365 在线零售商环境中工作。

有一个非常相似的问题: Performance Inserting and Updating Millions of rows into a table

这个问题也很重要: 插入大量行的最快方法是什么?

例子一:

CREATE PROCEDURE [dbo].[udpstaging_page_import_fromFilter]
      @sourceDesc nvarchar(50) -- e.g. 'Coremetrics'
      ,@feedDesc nvarchar(50) -- e.g. 'Daily Exports'
      ,@process_job_task_logID bigint
AS BEGIN


      SET NOCOUNT ON;
      BEGIN TRY


            --truncate table prior INSERT
            exec dbo.udpstaging_page_truncateTable;

            declare @source_feedID int;
            exec crm_admin.dbo.udpsource_feedID_select @sourceDesc
                  ,@feedDesc
                  ,@source_feedID = @source_feedID OUTPUT;


            -- drop temp tables
            if exists (select * from tempdb.dbo.sysobjects o where o.xtype in ('U') and o.id = object_id(N'tempdb..#pageImport'))
                  drop table #pageImport;


            -- create temp tables
            create table #pageImport(
                  pageImportID [bigint] identity(1,1) NOT NULL
                  ,pageCode [varbinary](16) NOT NULL
            );


            insert into #pageImport(
                  pageCode
            )
            select pageCode
            from Coremetrics.PageView
            group by pageCode;


            -- add indexes to temp table
            CREATE CLUSTERED INDEX IDX_pageImport_pageImportID ON #pageImport(pageImportID);
            CREATE INDEX IDX_pageImport_pageCode ON #pageImport(pageCode);


            declare @recordCount bigint
                  ,@counter int
                  ,@maxCounter int
                  ,@updateRowCount int;

            select @counter = MIN(pageImportID)
                  ,@recordCount = MAX(pageImportID)
            from #pageImport;

            set @updateRowCount = 1000000;

            while @counter <= @recordCount
            begin

                  set @maxCounter = (@counter + @updateRowCount - 1);

                  with pageImport as (
                        select pv.pageCode
                              ,pv.websiteCode as 'pageCIV'
                              ,dbo.udfDerivePageName(pv.PAGE_ID, pv.CONTENT_CATEGORY_ID) as 'pageName'
                              ,dbo.udfDerivePageName(pv.PAGE_ID, pv.CONTENT_CATEGORY_ID) as 'pageDesc'
                              ,pv.[TIMESTAMP] as 'pageCreateDate'
                              ,pv.pageTypeCode
                              ,'' as 'pageTypeCIV'
                              ,pv.websiteCode
                              ,pv.marketID
                              ,@source_feedID as 'source_feedID'
                              ,@process_job_task_logID as 'process_job_task_logID'
                              ,GETDATE() as 'createdDate'
                              ,SUSER_NAME() as 'createdBy'
                              ,GETDATE() as 'modifiedDate'
                              ,SUSER_NAME() as 'modifiedBy'
                              ,ROW_NUMBER() over (
                                    PARTITION BY [pi].pageCode
                                    ORDER BY pv.[TIMESTAMP]
                              ) as 'is_masterPageImport'
                        from #pageImport [pi]
                        inner join Coremetrics.PageView pv on pv.pageCode = [pi].pageCode
                              and [pi].pageImportID between @counter and @maxCounter
                  )                 
                  insert into staging.[page](
                        pageCode
                        ,pageCIV
                        ,pageName
                        ,pageDesc
                        ,pageCreateDate
                        ,pageTypeCode
                        ,pageTypeCIV
                        ,websiteCode
                        ,marketID
                        ,source_feedID
                        ,process_job_task_logID
                        ,createdDate
                        ,createdBy
                        ,modifiedDate
                        ,modifiedBy
                  )
                  select pageCode
                        ,pageCIV
                        ,pageName
                        ,pageDesc
                        ,pageCreateDate
                        ,pageTypeCode
                        ,pageTypeCIV
                        ,websiteCode
                        ,marketID
                        ,source_feedID
                        ,process_job_task_logID
                        ,createdDate
                        ,createdBy
                        ,modifiedDate
                        ,modifiedBy 
                  from pageImport
                  where 1 = 1
                        and is_masterPageImport = 1;

                  set @counter = @counter + @updateRowCount;
            end;


            SET NOCOUNT OFF;
            RETURN 0;

      END TRY
      BEGIN CATCH
            print N'inner catch: ' + error_message();
            SET NOCOUNT OFF;
            RETURN -10;
      END CATCH

END;
Run Code Online (Sandbox Code Playgroud)

示例二:

这只是太大而无法在此处发布的存储过程的一部分。

IF OBJECT_ID('tempdb.dbo.#ztblOrgProductStockView', 'U') IS NOT NULL
    DROP TABLE #ztblOrgProductStockView;

CREATE TABLE #ztblOrgProductStockView (
    [lngID] [int] NOT NULL IDENTITY PRIMARY KEY,
    [sintMarketId] [smallint] NOT NULL,
    [sintChannelId] [smallint] NOT NULL,
    [strOrgVwName] [varchar](50) COLLATE Latin1_General_CI_AS NOT NULL,
    [tintSequence] [tinyint] NOT NULL,
    [tintOrgGrpId] [tinyint] NOT NULL,
    [strTier1] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strTier2] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strTier3] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strTier4] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strTier5] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strTier6] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strItemNo] [varchar](20) COLLATE Latin1_General_CI_AS NOT NULL,
    [strStockTypeName] [varchar](50) COLLATE Latin1_General_CI_AS NOT NULL,
    [tintStockTypeId] [tinyint] NOT NULL,
    [sintDueWithinDays] [tinyint] NOT NULL,
    [bitOverSellingAllowed] [bit] NOT NULL,
    [dtmStartDate] [datetime] NULL,
    [dtmEndDate] [datetime] NULL,
    [dtmExpected] [datetime] NULL,
    [blnIsLocalToMarket] [bit] NULL,
    [blnPremiunDelvAllowed] [bit] NULL,
    [strStdDeliveryDaysCode] [varchar](20)
)



INSERT into #ztblOrgProductStockView (
    sintMarketId
    ,sintChannelId
    ,strOrgVwName
    ,tintSequence
    ,tintOrgGrpId
    ,strTier1
    ,strTier2
    ,strTier3
    ,strTier4
    ,strTier5
    ,strTier6
    ,strItemNo
    ,strStockTypeName
    ,tintStockTypeId
    ,sintDueWithinDays
    ,bitOverSellingAllowed
    ,dtmStartDate
    ,dtmEndDate
    ,dtmExpected
    ,blnIsLocalToMarket
    ,blnPremiunDelvAllowed
    ,strStdDeliveryDaysCode
)
    select
        rv.sintMarketId
        ,rv.sintChannelId
        ,rv.strOrgVwName
        ,tintSequence
        ,tintOrgGrpId
        ,ISNULL(rv.pnTier1,'ÿ')
        ,ISNULL(rv.pnTier2,'ÿ')
        ,ISNULL(rv.pnTier3,'ÿ')
        ,ISNULL(rv.strTier4,'ÿ')
        ,ISNULL(rv.strTier5,'ÿ')
        ,ISNULL(rv.strTier6,'ÿ')
        ,rv.strItemNo
        ,strStockTypeName
        ,tintStockTypeId
        ,sintDueWithinDays
        ,bitOverSellingAllowed
        ,dtmStartDate
        ,dtmEndDate
        ,dtmExpected
        ,blnIsLocalToMarket
        ,blnPremiunDelvAllowed
        ,strStdDeliveryDaysCode
    from #ztblOrgProductRangeView_1 rv
    inner join #ztblOrgProductSeqView_1 sv on rv.strItemNo = sv.strItemNo
        and rv.lngOrgVwId = sv.lngOrgVwId
    --order by rv.sintMarketId, rv.sintChannelId, sv.tintOrgGrpId, rv.strItemNo, sv.tintStockTypeId

--set @DebugDate = convert(nvarchar(10),getdate(),108)
--raiserror('%s [%s]', 0, 1, N'Populated #ztblOrgProductStockView', @DebugDate) with nowait

--select [sintMarketId], [sintChannelId], [tintOrgGrpId], [strItemNo], [tintStockTypeId], count(*)
--from [#ztblOrgProductStockView]
--group by [sintMarketId], [sintChannelId], [tintOrgGrpId], [strItemNo], [tintStockTypeId]
--having count(*) > 1

    set @lngRowcount = @@ROWCOUNT
    set nocount on;

        While @lngRowcount > 0
        Begin
            Set @lngMinID = @lngMaxID
            Set @lngMaxID = @lngMaxID + 5000

            INSERT INTO [ztblOrgProductStockView]
                   ([sintActiveView]    
                   ,[sintMarketId]
                   ,[sintChannelId]
                   ,[strOrgVwName]  
                   ,[tintSequence]
                   ,[tintOrgGrpId]
                   ,[strTier1]
                   ,[strTier2]
                   ,[strTier3]
                   ,[strTier4]
                   ,[strTier5]
                   ,[strTier6]
                   ,[strItemNo]
                   ,[strStockTypeName]
                   ,[tintStockTypeId]
                   ,[sintDueWithinDays]
                   ,[bitOverSellingAllowed]
                   ,[dtmStartDate]
                   ,[dtmEndDate]
                   ,[dtmExpected]
                   ,[blnIsLocalToMarket]
                   ,[blnPremiunDelvAllowed]
                   ,[strStdDeliveryDaysCode])
            Select
                    @sintActiveView_new
                   ,[sintMarketId]
                   ,[sintChannelId]
                   ,[strOrgVwName]
                   ,[tintSequence]
                   ,[tintOrgGrpId]
                   ,[strTier1]
                   ,[strTier2]
                   ,[strTier3]
                   ,[strTier4]
                   ,[strTier5]
                   ,[strTier6]
                   ,[strItemNo]
                   ,[strStockTypeName]
                   ,[tintStockTypeId]
                   ,[sintDueWithinDays]
                   ,[bitOverSellingAllowed]
                   ,[dtmStartDate]
                    ,[dtmEndDate]
                   ,[dtmExpected]
                   ,[blnIsLocalToMarket]
                   ,[blnPremiunDelvAllowed]
                   ,[strStdDeliveryDaysCode]
            From #ztblOrgProductStockView
            Where lngID >= @lngMinID
            And   lngID <  @lngMaxID

            set @lngRowcount = @@ROWCOUNT

        End
Run Code Online (Sandbox Code Playgroud)

问题 请注意,基于意见的答案在这里并不是最受欢迎的,请尽可能提供证据。

1)如何确定组织批次大小的最佳方式?例如在示例二上它是 5000。

2)如果我BEGIN TRANSACTIONCOMMIT TRANSACTION在 内,它通常会有更多的机会提高性能while loop吗?一笔交易。

3)如果我想更改批处理的大小,我可以监控什么来决定是否可以增加批处理的大小,或者我是否会导致 I/O 延迟?

我目前使用以下脚本找到 I/O 延迟:

-- How to identify I/O latency issues
-- Below SQL code helps in identifying the I/O latency issues in a SQL Server system on a per-file basis.
-- http://sqlserverdbknowledge.wordpress.com/2011/11/08/how-to-identify-io-latency-issues/
--http://www.sqlskills.com/blogs/paul/how-to-examine-io-subsystem-latencies-from-within-sql-server/

--MARCELO MIORELLI 26-JULY-2013

SELECT 
--- virtual file latency
ReadLatency = CASE WHEN num_of_reads = 0
THEN 0 ELSE (io_stall_read_ms / num_of_reads) END,
WriteLatency = CASE WHEN num_of_writes = 0 
THEN 0 ELSE (io_stall_write_ms / num_of_writes) END,
Latency = CASE WHEN (num_of_reads = 0 AND num_of_writes = 0)
THEN 0 ELSE (io_stall / (num_of_reads + num_of_writes)) END,
--– avg bytes per IOP
AvgBPerRead = CASE WHEN num_of_reads = 0 
THEN 0 ELSE (num_of_bytes_read / num_of_reads) END,
AvgBPerWrite = CASE WHEN io_stall_write_ms = 0 
THEN 0 ELSE (num_of_bytes_written / num_of_writes) END,
AvgBPerTransfer = CASE WHEN (num_of_reads = 0 AND num_of_writes = 0)
THEN 0 ELSE ((num_of_bytes_read + num_of_bytes_written) / 
(num_of_reads + num_of_writes)) END, 
LEFT (mf.physical_name, 2) AS Drive,
DB_NAME (vfs.database_id) AS DB,
--- –vfs.*,
mf.physical_name
FROM sys.dm_io_virtual_file_stats (NULL,NULL) AS vfs
JOIN sys.master_files AS mf
ON vfs.database_id = mf.database_id
AND vfs.file_id = mf.file_id
--WHERE vfs.file_id = 2 — log files
-- ORDER BY Latency DESC
-- ORDER BY ReadLatency DESC
ORDER BY WriteLatency DESC;
GO 
Run Code Online (Sandbox Code Playgroud)

Sol*_*zky 4

基于意见的答案在这里并不是最受欢迎的,请尽可能尝试提供证据。

好吧,这并不完全公平,因为最终,最有效的“证据”将来自您的系统;-)。您的硬件、您的数据、您的系统负载等将决定什么最有效。如何处理事情以及系统如何工作都存在很多变数,因此在一个系统中效果最好的方法在另一个系统中可能并不那么好。

1)如何决定组织批次大小的最佳方式?例如,在示例二中,它是 5000。

这主要是一个反复试验的问题,看看什么最有效。但是,请务必记住,锁升级通常发生在 5000 个锁时。根据数据的组织方式,5000 个更改可能是 5000 个行锁或几个页锁。这是基于每个对象的,因为不同索引中行的顺序可能不同。要点是,在这些操作期间对需要被其他进程使用的表所做的更改应该尽量避免表锁(即锁升级的结果)。但是诸如临时表和临时表之类的表将受益于表锁,因为它是单个操作并且不应该存在争用,因此TABLOCK在执行“批量”操作时会出现提示INSERT

2)如果我在 while 循环内开始事务并提交事务,通常会有更多机会提高性能吗?一笔交易为一批。

当这些操作是逐行进行时,将多个 DML 操作包装到显式事务中可以极大地提高性能。从您在此处发布的代码来看,您已经在执行基于集合的操作,因此将任何内容组合到事务中只会带来很小的时间优势。此外,在WHILE两个示例的循环中,您都在执行单个INSERT操作,这是它自己的事务,因此在循环内添加事务WHILE无论如何都不会获得任何好处。并且,在循环中添加显式事务WHILE会将整个集合放入单个事务中,这可能会对时间安排有所帮助,但您也会有一个巨大的事务,这会增加阻塞的机会并有助于 LOG文件增长,因为此事务将持续更长时间。

3)如果我想更改批处理的大小,我可以监视什么来决定是否可以增加批处理的大小,或者是否会导致 I/O 延迟?

监控进程运行得更快还是更慢。尝试几种不同的批量大小,并让每一种批量运行该过程的多次迭代。跟踪每个批量大小的流程运行多长时间,您将找到最有效的方法。


沿着这些思路,我至少会尝试减少示例 1 中的百万行批处理大小。

我还将标量 UDF 转换为内联 TVF,并使用或dbo.udfDerivePageName将其合并到查询(示例 1)中。并且,考虑到对 UDF 的两次调用都传递相同的两个参数,您只需引用返回的字段两次(一次 as和一次 as ),而不是对 iTVF 进行两次调用。CROSS APPLYOUTER APPLYpageNamepageDesc

减少目标表争用的另一个选项(如果这只是插入新行而不更新现有行)是使用表分区。这将允许您像当前正在做的那样暂存数据,但是您不需要将新数据插入到活动表中,而是创建SWITCH新分区,这是一个相当快速的操作。这对减少暂存数据所需的时间或 I/O 没有什么帮助,但它可以消除通过将暂存数据“合并”到实时表中所花费的时间和争用。这是值得研究的事情。


归档时间:

查看次数:

9857 次

最近记录:

9 年,10 月 前