Azure 持久编排函数触发两次

Jep*_*ppe 4 c# orchestration azure-functions azure-durable-functions

我正在尝试实施 Azure Durable Function 工作流。

每 6 分钟我就有一个 Azure TimerTrigger 函数调用一个 Azure 编排函数 (OrchestrationTrigger),它依次启动许多活动函数 (ActivityTrigger)。

然而,有时,Orchestration 函数会在几秒钟内被调用两次!这是一个大问题,因为我的活动函数不是幂等的!

下面是我的代码是如何被调用的。

定时器触发功能:

[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)
{
    List<OrchestrationModel> ExportModels = await getData();

    string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);
}
Run Code Online (Sandbox Code Playgroud)

编排功能:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    }
    await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

活动功能:

[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)
{
    TransformModel = context.GetInput<TransformModel>();

    //Do some work with TransformModel
}
Run Code Online (Sandbox Code Playgroud)

kam*_*lod 7

这种行为非常好 - 这就是 Durable Functions 的设计方式。

您有以下编排:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
    }
    await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

当一个活动被调用时,流程返回到一个叫做Dispatcher的概念——它是 Durable Functions 的内部存在,负责维护你的编排流程。在等待任务完成时,编排会暂时解除分配。任务完成后,将重播整个编排,直到下一次await发生。

重要的是,尽管重播了编排,但不会再次调用活动 - 从存储中获取并使用其结果。为了使事情更明确,请考虑以下示例:

[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var dataList = context.GetInput<List<OrchestrationModel>>();
    var tasks = new List<Task>();

    foreach (var data in dataList)
    {
        await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
        await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
    }
}
Run Code Online (Sandbox Code Playgroud)

TransformToSql1等待时,业务流程被解除分配并且整个流程等待直到该活动完成。然后重播编排 - 它再次等待,TransformToSql1但由于它的结果已保存,它只是返回到编排并等待TransformToSql2- 然后重复该过程。

  • 我知道活动的重播功能,而且工作正常,没有任何问题。我的问题是 Orchestrator 功能的两个并发执行。 (2认同)

Mar*_*arc 7

编排功能将更频繁地运行,因为它由持久功能框架重播。您是否看到您的活动函数也再次被触发?如果是这样的话,这确实是一种奇怪的行为。

Durable Functions 使用存储队列和表来控制流程并捕获编排的状态。

每次触发编排功能(通过从控制队列接收消息)时,它都会重播编排。您可以使用IsReplaying上的属性在代码中检查这一点DurableOrchestrationContext

if (!context.IsReplaying)
{
   // this code will only run during the first execution of the workflow
}
Run Code Online (Sandbox Code Playgroud)

不要使用IsReplaying来确定运行活动,而是将其用于日志记录/调试目的。

当到达活动函数时,一条消息将被放入工作项队列中。然后,Durable Functions 框架将查看历史表,以确定该活动函数之前是否已运行过(使用给定的参数)。如果是,则它将不会再次运行活动函数,并将继续执行编排函数的其余部分。

Durable Functions 的检查点和重放功能仅在编排代码具有确定性时才起作用。因此,切勿使用基于日期时间或随机数的决策。

更多信息:https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay