Azure Durable Function Activity 似乎运行了多次且未完成

jb9*_*b99 5 azure orchestration azure-durable-functions

我有一个耐用的功能扇出和输入模式,但似乎工作不可靠。每 10 分钟从 Timer 函数调用一次 Orchestration,但此后已将其增加到 20 分钟。使用 context.CallActivityAsync 调用 Activity 函数并返回一个整数(已处理的行数)。目前,传入的 workItems 应该只包含 2 个要处理的项目。第一项处理所有行并在日志中显示完整的行。第二项有时显示正在处理的行,但在某些时候它只是停止...没有其他活动被识别,并且“完成”永远不会显示在日志中。另外,第二个活动有时会显示它同时运行多次...我已经在我的开发机器上使用相同的数据尝试了这个确切的代码,并且它处理完成的时间不超过 5 分钟。我还将hosts.json 文件设置为

{
  "version": "2.0",
  "functionTimeout": "00:10:00",
  "extensions": {
    "queues": {
      "maxPollingInterval": "00:00:05"
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

编排:

public static async void RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** Fanning out ********************");
            var parallelTasks = new List<Task<int>>();
            //object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
            object[] workBatch = GETVendors(log); //work batch only has 2 items 
            for (int i = 0; i<workBatch.Length; i++)
            {
                Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
                parallelTasks.Add(task);
            }

            log.LogInformation($"************** 'Waiting' for parallel results ********************");
            await Task.WhenAll(parallelTasks);
            log.LogInformation($"************** All activity functions complete ********************");

            log.LogInformation($"************** fanning in ********************");
            int cnt = 0;
            foreach (var completedParallelActivity in parallelTasks)
            {
                cnt += completedParallelActivity.Result;
            }
            log.LogInformation($"Total Records Converted across all tasks = {cnt}");
            //return outputs;
        }
Run Code Online (Sandbox Code Playgroud)

活动功能

public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
        {
            log.LogInformation($"SynchVendor {vendor}");


            string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
            bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
            int totalCount = 0;

            using (SqlConnection conn = new SqlConnection(sqlStr))
            {
                conn.Open();


                // lets synch the vendor


                Int32 limit = 200;
                bool initialLoad = false;
                int offset = 0;
                bool done = false;

                do
                {

                    //synch logic...
                    // if there are rows found to have changed then send them to a queue for further processing

                } while (!done);

                // we are done syncing a vendor write out the vendorinfo


                conn.Close();
            }
            log.LogInformation($"SynchVendor {vendor} Complete");
            return totalCount;
Run Code Online (Sandbox Code Playgroud)

kwi*_*ill 3

对于额外的日志记录,您需要将其添加到 log.Log*** 操作之前:

if (!context.IsReplaying) 
Run Code Online (Sandbox Code Playgroud)

请参阅https://learn.microsoft.com/en-us/sandbox/functions-recipes/durable-diagnostics#logging-in-orchestrator-functions了解更多信息。

对于永远看不到完成的问题,您可以执行以下操作:

  1. 你没有错误处理。如果您的活动函数抛出异常会发生什么?您应该使用带有日志的 Try/Catch 来让您知道何时出现故障。
  2. 功能日志显示什么?请注意,我不是在谈论 customDimensions.Category=="User" 的日志(即您的日志条目),而是在谈论 Functions 运行时执行的日志。在 appinsights 日志中,在适当的时间范围内运行“union traces | union exceptions”,以查看 Functions 运行时正在执行的操作。
  3. 尝试添加超时任务,以便即使其中一项任务未完成,您的协调器也能完成。请参阅https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling?tabs=csharp#function-timeouts