SSIS将Azure blob中的CSV加载到Azure SQL的步骤

Sco*_*can 6 sql-server csv ssis azure azure-storage-blobs

我需要连接到Azure blob(源)中的CSV文件,然后将数据加载到Azure SQL Server表中,然后将CSV文件移动到其他(存档)Azure blob.

如果没有Azure,我会创建一个到本地文件的平面文件连接,然后Data Flow task使用Source Assistant&Destination Assistant将数据加载到SQL Server表中,然后使用File System taskin Control Flow将文件移动到Archive目录.

我想做类似的事情,直接连接到Azure blob中的文件,然后在数据流任务之后,执行文件从一个Azure blob(源)移动到另一个Azure Blob(存档).

我能想到的最好的方法是使用Azure Blob Download task将CSV移动到运行SSIS的Azure VM(顺便说一句,你可以在没有VM的情况下获得Azure SSIS服务吗?),然后在下载后创建一个flat file connection&Data Flow来加载数据,然后执行Azure Blob Upload taskArchive Blob.

似乎应该有一种方法可以连接到源Azure blob文件并直接从中读取,而无需先下载它.同样,似乎应该有一种方法在Azure blob容器之间移动文件.我能想到的最好的是下载/上传选项,但这需要一个中间位置(本地目录),并且在下载后不会删除源文件.是否有SSIS Azure工具来执行这些任务?

Bru*_*hen 6

对于另一半问题的任何想法:在处理它们之后将文件从Source blob移动到Archive blob?

据我所知,没有任何内置任务可以帮助您实现此目的.基于我的测试,我假设您可以利用脚本任务并编写代码(VB或C#)来直接处理blob.以下是我的详细步骤,您可以参考它们:

1)使用Azure的斑点源OLE DB目标的数据流,用于加载从Azure的斑点一个CSV文件导入到SQL Azure的数据库.

在此输入图像描述

2)将CSV数据成功加载到SQL表后,使用脚本任务将源blob移动到存档blob.

在此输入图像描述

我将使用Container SAS令牌调用Blob Service REST API 副本BlobDelete Blob,您可以利用Microsoft Azure Storage Explorer并按照此官方教程为您的Blob容器生成SAS令牌.

假设源blob和目的地斑点是相同的容器下,然后我加了三个变量(SourceBlobUrl,ContainerSasToken,ArchiveBlobUrl)如下,并将其添加为ReadOnlyVariables 脚本任务编辑器,你可以参考本教程在脚本任务中使用变量.

在此输入图像描述

单击"脚本任务编辑器"下的" 编辑脚本"按钮以启动用于编写自定义脚本的VSTA开发环境.这是下面的Main方法ScriptMain.cs:

public async void Main()
{
    // TODO: Add your code here
    string sasToken = Dts.Variables["ContainerSasToken"].Value.ToString();
    string sourceBlobUrl = Dts.Variables["SourceBlobUrl"].Value.ToString();
    string archiveBlobUrl = Dts.Variables["ArchiveBlobUrl"].Value.ToString();

    try
    {
        HttpClient client = new HttpClient();
        client.DefaultRequestHeaders.Add("x-ms-copy-source", sourceBlobUrl + sasToken);
        //copy source blob to archive blob
        Dts.Log($"start copying blob from [{sourceBlobUrl}] to [{archiveBlobUrl}]...", 0, new byte[0]);
        HttpResponseMessage response = await client.PutAsync(archiveBlobUrl + sasToken, null);
        if (response.StatusCode == HttpStatusCode.Accepted || response.StatusCode == HttpStatusCode.Created)
        {
            client.DefaultRequestHeaders.Clear();
            Dts.Log($"start deleting blob [{sourceBlobUrl}]...", 0, new byte[0]);
            //delete source blob
            HttpResponseMessage result = await client.DeleteAsync(sourceBlobUrl + sasToken);
            if (result.StatusCode == HttpStatusCode.Accepted || result.StatusCode == HttpStatusCode.Created)
            {
                Dts.TaskResult = (int)ScriptResults.Success;
                return;
            }
        }
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
    catch (Exception ex)
    {
        Dts.Events.FireError(-1, "Script Task - Move source blob to an archive blob", ex.Message + "\r" + ex.StackTrace, String.Empty, 0);
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
}
Run Code Online (Sandbox Code Playgroud)

结果

在此输入图像描述 在此输入图像描述

此外,您还可以利用Microsoft Azure存储客户端库来访问存储blob,此时,您需要在不在GAC中的SSIS脚本任务中加载程序集,有关详细信息,请参阅此官方博客.