处理多维数据集的有效方法

Yah*_*ufi 6 sql-server olap ssas ssis sql-server-2014

我正在使用SSAS构建多维多维数据集,我根据日期列创建了分区,并为每天定义了一个分区。源数据大小大于2 TB。

在部署和处理多维数据集时,如果发生错误,则不会保存所有已处理分区,并且它们的状态仍未处理。

搜索了一段时间后,我发现以下文章提到了这一点:

并行(处理选项):用于批处理。此设置导致Analysis Services派生处理任务以在单个事务中并行运行。如果发生故障,则结果是所有更改都将回滚。

搜索之后,我发现了一种替代方法,可以从SSIS包中一步一步处理分区,如以下文章中所述:

但是处理时间增加了400%以上。有没有一种有效的方法可以并行处理分区,而不会在发生错误时丢失所有进度?

Had*_*adi 7

如果需要从并行处理选项中受益,则不能强制停止所有已处理分区的回滚。

解决类似问题的首选方法之一是分批处理分区。无需一次处理所有分区,而是可以自动并行处理每个n个分区。(经过许多经验,我发现在我的机器上将MaxParallel选项配置为10是最佳解决方案)。

然后,如果发生错误,将仅回滚当前批次。

在这个答案中,我将尝试提供逐步指南,以使用SSIS自动化批量处理分区。

套餐概述

  1. 一批建筑尺寸
  2. 获取未处理的分区数
  3. 循环分区(每个循环读取10个分区)
  4. 处理数据
  5. 工艺指标

套餐详情

创建变量

首先,我们必须添加一些在过程中需要的变量:

在此处输入图片说明

  • intCount,intCurrent:在forloop容器中使用
  • p_Cube:多维数据集对象ID
  • p_Database:分析数据库ID
  • p_MaxParallel:一批中要处理的分区数
  • p_MeasureGroup:测量组对象标识
  • p_ServerName:分析服务实例名称 <Machine Name>\<Instance Name>
  • strProcessData,strProcessDimensions和strProcessIndexes:用于存储与处理数据,索引和维相关的XMLA查询

名称开头的所有变量p_都是必需的,可以将其添加为参数。

为Analysis Services添加连接管理器

添加变量后,我们必须创建一个连接管理器以连接到SQL Server Analysis Service实例:

  1. 首先,我们必须手动配置连接管理器:

在此处输入图片说明

  1. 然后,我们必须分配服务器名称和初始目录表达式,如下图所示:

在此处输入图片说明

  1. 将连接管理器重命名为ssas

在此处输入图片说明

加工尺寸

首先,添加序列容器以隔离包中的维处理,然后添加脚本任务和Analysis Services处理任务:

在此处输入图片说明

在此处输入图片说明

打开脚本任务和选择p_Databasep_MaxParallel为只读变量,并strProcessDimensions为读写变量:

在此处输入图片说明

现在,打开脚本编辑器并使用以下代码:

该代码是准备XMLA命令以处理维的方法,此XMLA查询将在Analysis Services处理任务中使用

#region Namespaces
using System;
using System.Data;
using System.Data.SqlClient;
using Microsoft.SqlServer.Dts.Runtime;
using System.Linq;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
#endregion

namespace ST_00ad89f595124fa7bee9beb04b6ad3d9
{

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {

        public void Main()
        {
            Server myServer = new Server();

            string ConnStr = Dts.Connections["ssas"].ConnectionString;
            myServer.Connect(ConnStr);

            Database db = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());

            int maxparallel = (int)Dts.Variables["p_MaxParallel"].Value;

            var dimensions = db.Dimensions; 

            string strData;

            strData = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";

            foreach (Dimension dim in dimensions)
            {
             strData +=
             "    <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n" +
             "     <Object> \r\n" +
             "       <DatabaseID>" + db.ID + "</DatabaseID> \r\n" +
             "       <DimensionID>" + dim.ID + "</DimensionID> \r\n" +
             "     </Object> \r\n" +
             "     <Type>ProcessFull</Type> \r\n" +
             "     <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n" +
             "    </Process> \r\n";
            }

            //}

            strData += " </Parallel> \r\n</Batch>";

            Dts.Variables["strProcessDimensions"].Value = strData;
            Dts.TaskResult = (int)ScriptResults.Success;
        }

        #region ScriptResults declaration

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

    }
}
Run Code Online (Sandbox Code Playgroud)

现在,打开Analysis Services处理任务并手动定义任何任务,然后转到表达式并将strProcessDimensions变量分配给ProcessingCommands属性:

在此处输入图片说明

获取未处理的分区数

为了遍历块中的分区,我们首先要获得未处理的分区数。为此,您必须添加一个脚本任务。选择p_Cubep_Databasep_MeasureGroupp_ServerName变量为只读变量和intCount作为读写变量。

在此处输入图片说明

在脚本编辑器中,编写以下脚本:

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
using System.Linq;
#endregion

namespace ST_e3da217e491640eca297900d57f46a85
{

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {

        public void Main()
        {
            // TODO: Add your code here
            Server myServer = new Server();

            string ConnStr = Dts.Connections["ssas"].ConnectionString;
            myServer.Connect(ConnStr);

            Database db  = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());
            Cube objCube = db.Cubes.FindByName(Dts.Variables["p_Cube"].Value.ToString());
            MeasureGroup objMeasureGroup = objCube.MeasureGroups[Dts.Variables["p_MeasureGroup"].Value.ToString()];

            Dts.Variables["intCount"].Value = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).Count();

            Dts.TaskResult = (int)ScriptResults.Success;
        }

        #region ScriptResults declaration

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

    }
}
Run Code Online (Sandbox Code Playgroud)

分块处理分区

最后一步是创建一个Forloop容器并对其进行配置,如下图所示:

在此处输入图片说明

  • InitExpression:@intCurrent = 0
  • EvalExpression:@intCurrent <@intCount
  • AssignExpression = @intCurrent + @p_MaxParallel

在For Loop容器内,添加一个脚本任务以准备XMLA查询,并添加两个Analysis Services处理任务,如下图所示:

在此处输入图片说明

在脚本任务,选择p_Cubep_Databasep_MaxParallelp_MeasureGroup为只读变量,并选择strProcessDatastrProcessIndexes如读写变量。

在此处输入图片说明

在脚本编辑器中,编写以下脚本:

脚本是准备分别处理分区数据和索引所需的XMLA命令

#region Namespaces
using System;
using System.Data;
using System.Data.SqlClient;
using Microsoft.SqlServer.Dts.Runtime;
using System.Linq;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
#endregion

namespace ST_00ad89f595124fa7bee9beb04b6ad3d9
{

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {


        public void Main()
        {
            Server myServer = new Server();

            string ConnStr = Dts.Connections["ssas"].ConnectionString;
            myServer.Connect(ConnStr);

            Database db = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());
            Cube objCube = db.Cubes.FindByName(Dts.Variables["p_Cube"].Value.ToString());
            MeasureGroup objMeasureGroup = objCube.MeasureGroups[Dts.Variables["p_MeasureGroup"].Value.ToString()];
            int maxparallel = (int)Dts.Variables["p_MaxParallel"].Value;


            int intcount = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).Count();

            if (intcount > maxparallel)
            {
                intcount = maxparallel;
            }

            var partitions = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).OrderBy(y => y.Name).Take(intcount);

            string strData, strIndexes;

            strData = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";
            strIndexes = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";

            string SQLConnStr = Dts.Variables["User::p_DatabaseConnection"].Value.ToString();



            foreach (Partition prt in partitions)
            {


                strData +=
                 "    <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n " +
                 "      <Object> \r\n " +
                 "        <DatabaseID>" + db.Name + "</DatabaseID> \r\n " +
                 "        <CubeID>" + objCube.ID + "</CubeID> \r\n " +
                 "        <MeasureGroupID>" + objMeasureGroup.ID + "</MeasureGroupID> \r\n " +
                 "        <PartitionID>" + prt.ID + "</PartitionID> \r\n " +
                 "      </Object> \r\n " +
                 "      <Type>ProcessData</Type> \r\n " +
                 "      <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n " +
                 "    </Process> \r\n";

                strIndexes +=
                "    <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n " +
                "      <Object> \r\n " +
                "        <DatabaseID>" + db.Name + "</DatabaseID> \r\n " +
                "        <CubeID>" + objCube.ID + "</CubeID> \r\n " +
                "        <MeasureGroupID>" + objMeasureGroup.ID + "</MeasureGroupID> \r\n " +
                "        <PartitionID>" + prt.ID + "</PartitionID> \r\n " +
                "      </Object> \r\n " +
                "      <Type>ProcessIndexes</Type> \r\n " +
                "      <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n " +
                "    </Process> \r\n";



            }

            strData += " </Parallel> \r\n</Batch>";
            strIndexes += " </Parallel> \r\n</Batch>";

            Dts.Variables["strProcessData"].Value = strData;
            Dts.Variables["strProcessIndexes"].Value = strIndexes;

            Dts.TaskResult = (int)ScriptResults.Success;
        }

        #region ScriptResults declaration

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

    }
}
Run Code Online (Sandbox Code Playgroud)

现在,打开两个Analysis Services处理任务并手动定义任何任务(仅用于验证任务)。然后转到expression并将strProcessData变量分配给ProcessingCommandsFirst Task中的属性,并将strProcessIndexes变量分配给ProcessingCommands

在此处输入图片说明

现在,您可以执行该程序包,如果发生错误,则仅当前批次将回滚(10个分区)。

可能的改善

您可以添加一些日志记录任务来跟踪软件包的进度,尤其是在处理大量分区时。


由于它包含有用的详细信息,因此我在个人博客上发布了此答案:

我还发表了一篇有关SQLShack的更多详细信息的文章: