Jep*_*ppe 4 c# orchestration azure-functions azure-durable-functions
我正在尝试实施 Azure Durable Function 工作流。
每 6 分钟我就有一个 Azure TimerTrigger 函数调用一个 Azure 编排函数 (OrchestrationTrigger),它依次启动许多活动函数 (ActivityTrigger)。
然而,有时,Orchestration 函数会在几秒钟内被调用两次!这是一个大问题,因为我的活动函数不是幂等的!
下面是我的代码是如何被调用的。
定时器触发功能:
[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)
{
List<OrchestrationModel> ExportModels = await getData();
string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);
}
Run Code Online (Sandbox Code Playgroud)
编排功能:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)
活动功能:
[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)
{
TransformModel = context.GetInput<TransformModel>();
//Do some work with TransformModel
}
Run Code Online (Sandbox Code Playgroud)
这种行为非常好 - 这就是 Durable Functions 的设计方式。
您有以下编排:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)
当一个活动被调用时,流程返回到一个叫做Dispatcher的概念——它是 Durable Functions 的内部存在,负责维护你的编排流程。在等待任务完成时,编排会暂时解除分配。任务完成后,将重播整个编排,直到下一次await发生。
重要的是,尽管重播了编排,但不会再次调用活动 - 从存储中获取并使用其结果。为了使事情更明确,请考虑以下示例:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
}
}
Run Code Online (Sandbox Code Playgroud)
当TransformToSql1等待时,业务流程被解除分配并且整个流程等待直到该活动完成。然后重播编排 - 它再次等待,TransformToSql1但由于它的结果已保存,它只是返回到编排并等待TransformToSql2- 然后重复该过程。
编排功能将更频繁地运行,因为它由持久功能框架重播。您是否看到您的活动函数也再次被触发?如果是这样的话,这确实是一种奇怪的行为。
Durable Functions 使用存储队列和表来控制流程并捕获编排的状态。
每次触发编排功能(通过从控制队列接收消息)时,它都会重播编排。您可以使用IsReplaying上的属性在代码中检查这一点DurableOrchestrationContext。
if (!context.IsReplaying)
{
// this code will only run during the first execution of the workflow
}
Run Code Online (Sandbox Code Playgroud)
不要使用IsReplaying来确定运行活动,而是将其用于日志记录/调试目的。
当到达活动函数时,一条消息将被放入工作项队列中。然后,Durable Functions 框架将查看历史表,以确定该活动函数之前是否已运行过(使用给定的参数)。如果是,则它将不会再次运行活动函数,并将继续执行编排函数的其余部分。
Durable Functions 的检查点和重放功能仅在编排代码具有确定性时才起作用。因此,切勿使用基于日期时间或随机数的决策。
更多信息:https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay
| 归档时间: |
|
| 查看次数: |
4349 次 |
| 最近记录: |