在长时间运行的 Azure 数据工厂管道上发出警报的方法

Dhr*_*shi 5 azure-monitoring azure-data-factory azure-log-analytics

我有一些数据工厂管道,当将数据从 blob 复制到 SQL 时,有时可能会运行超过 2 小时。时间段是可变的,但我希望在任何管道运行超过 2 小时时收到通知/警报。

有哪些可能的方法可以做到这一点?

到目前为止我已经尝试过:

  • 探索了可以在其上放置警报规则的 adf 指标。但似乎没有人谈论主动跑步的持续时间。
  • 我希望获得管道的持续时间值,就像我们在 adf.azure.com 的“监视器”选项卡上看到的那样,并使用它来发出某种警报。
  • 我还在想,如果我可以获得管道启动时间,那么也许我可以从当前时间计算总运行时间,并在此基础上添加一些警报。

在此输入图像描述

Joe*_*ran 3

我们这样做是为了跟踪正在运行的管道并管理执行并发性。我发现逻辑应用和 Azure Functions 是创建此类解决方案的绝佳工具。以下是我们如何处理此问题的粗略概述:

  1. 一组利用 Microsoft.Azure.Management.DataFactory SDK 的Azure Functions (AF) 。相关代码位于本文底部。
  2. SQL Server 表中管道执行的日志。该表包含 PipelineId 和 Status 以及一些其他信息。每当您创建管道时,您都需要插入到该表中。我们使用一个单独的逻辑应用程序来调用 AF,使用下面代码中的“RunPipelineAsync”方法来执行管道,捕获新的 PipelineId (RunId),并将其发送到存储过程以记录 PipelineId。
  3. 在重复触发器(每 3 分钟)上运行的逻辑应用,a) 调用一个存储过程,该存储过程轮询表(上面的#2)并返回 Status =“InProgress”的所有管道;b) foreach 返回的列表并调用 AF(上面的 #1),该 AF 使用下面代码中的“GetPipelineInfoAsync”方法检查管道的当前状态;c) 调用另一个存储过程来更新表中的状态。

您可以执行与此类似的操作,并使用“DurationInMS”根据状态 =“InProgress”和总运行时间 > {所需警报阈值}生成适当的操作。

这是我使用的 DataFactoryHelper 类:

using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.Management.DataFactory;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace AzureUtilities.DataFactory
{
    public class DataFactoryHelper
    {
        private ClientCredential Credentials { get; set; }
        private string KeyVaultUrl { get; set; }
        private string TenantId { get; set; }
        private string SubscriptionId { get; set; }

        private DataFactoryManagementClient _client = null;
        private DataFactoryManagementClient Client
        {
            get {
                if (_client == null)
                {
                    var context = new AuthenticationContext("https://login.windows.net/" + TenantId);
                    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", Credentials).Result;
                    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
                    _client = new DataFactoryManagementClient(cred) { SubscriptionId = SubscriptionId };
                }

                return _client;
            }
        }

        public DataFactoryHelper(string servicePrincipalId, string servicePrincipalKey, string tenantId, string subscriptionId)
        {
            Credentials = new ClientCredential(servicePrincipalId, servicePrincipalKey);
            TenantId = tenantId;
            SubscriptionId = subscriptionId;
        }

        public async Task<string> RunPipelineAsync(string resourceGroupName,
                                                   string dataFactoryName,
                                                   string pipelineName,
                                                   Dictionary<string, object> parameters = null,
                                                   Dictionary<string, List<string>> customHeaders = null)
        {
            var runResponse = await Client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroupName, dataFactoryName, pipelineName, parameters: parameters , customHeaders: customHeaders);
            return runResponse.Body.RunId;
        }

        public async Task<object> GetPipelineInfoAsync(string resourceGroup, string dataFactory, string runId)
        {
            var info = await Client.PipelineRuns.GetAsync(resourceGroup, dataFactory, runId);
            return new
            {
                RunId = info.RunId,
                PipelineName = info.PipelineName,
                InvokedBy = info.InvokedBy.Name,
                LastUpdated = info.LastUpdated,
                RunStart = info.RunStart,
                RunEnd = info.RunEnd,
                DurationInMs = info.DurationInMs,
                Status = info.Status,
                Message = info.Message
            };
        }
    }
}
Run Code Online (Sandbox Code Playgroud)