持久功能中只有扇出(忘了)

alw*_*ing 3 c# azure azure-functions serverless azure-durable-functions

我有一个具有2个功能和一个存储队列的现有功能应用程序。F1由服务总线主题中的消息触发。对于收到的每个味精,F1计算一些子任务(T1,T2,...),这些子任务必须以不同的延迟量执行。例如-T1将在3分钟后触发,T2将在5分钟后触发,依此类推。F1将消息发布到具有适当可见性超时(以模拟延迟)的存储队列中,并且只要队列中可见消息,就会触发F2。一切正常。

我现在想将此应用程序迁移为使用“耐用功能”。F1现在仅启动协调器。协调器代码如下:

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);

         //perform subtask work after delay! 
    }
Run Code Online (Sandbox Code Playgroud)

我只想扇出(不扇入收集结果)并启动子任务。协调器启动所有任务,并避免调用“ await Task.WhenAll”。活动功能调用“ Task.Delay”以等待指定的时间,然后执行其工作。

我的问题

  • 在此工作流程中使用持久功能是否有意义?
  • 这是编排“转出”工作流程的正确方法吗?
  • 我不喜欢活动功能在指定的时间(3或5分钟)内不执行任何操作的事实。会产生费用吗?
  • 同样,如果需要超过10分钟的延迟,则使用此方法也无法使活动功能成功!
  • 为了避免这种情况,我较早的尝试是在业务流程管理器中使用“ CreateTimer”,然后将活动添加为延续,但我在“历史记录”表中仅看到计时器条目。继续不触发!我是否违反了协调器代码约束 -“协调器代码绝不能启动任何异步操作”?

    foreach (var value in results)
    {
            //calculate time to start
            var timeToStart = ;
            var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
            tasks.Add(pnTask);
    }
    
    Run Code Online (Sandbox Code Playgroud)

    更新:使用克里斯建议的方法

    计算子任务和延迟的活动

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }
    
    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
    
            var timeToStart = currentTime.Add(actualDelay);
    
            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }
    
        await Task.WhenAll(tasks);
    }
    
    Run Code Online (Sandbox Code Playgroud)

简单地从协调器内部调度任务似乎可以工作。就我而言,我正在计算任务和循环之前另一个活动(CalculateTasks)中的延迟。我希望使用活动运行时的“当前时间”来计算延迟。我在活动中使用DateTime.UtcNow。在协调器中使用时,这种方式不能很好地发挥作用。“ ContinueWith”指定的活动不会运行,并且协调器始终处于“正在运行”状态。

我不能使用业务流程在协调器内部记录的时间吗?

更新2

因此,克里斯建议的解决方法有效!

由于我不想收集活动的结果,因此避免await Tasks.WhenAll(tasks)在安排所有活动之后调用“ ”。我这样做是为了减少控制队列上的争用,即如果需要的话,可以启动另一个业务流程。尽管如此,“协调器”的状态仍然为“ 正在运行 ”,直到所有活动完成运行。我猜想只有在最后一个活动向控制队列中发布“完成”消息后,它才会移至“ 完成 ”。

我对吗?有什么办法可以提早释放协调器,即在安排所有活动之后释放该协调器吗?

Chr*_*lum 5

ContinueWith方法对我来说很好。我可以使用以下协调器代码模拟您的方案的版本:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

值得一提的是,这是活动功能代码。

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}
Run Code Online (Sandbox Code Playgroud)

从日志输出中,我看到所有活动调用彼此之间相距10秒。

另一种方法是将其散布到多个子业务流程(如建议的@jeffhollan),这些业务流程很短,只有很短的时间,包括持久的计时器延迟和您的活动调用。

更新 我尝试使用您更新的样本,并且能够重现您的问题!如果您在Visual Studio中本地运行并将异常设置配置为始终在异常时中断,那么您应该看到以下内容:

System.InvalidOperationException:'检测到多线程执行。如果协调器功能代码等待不是由DurableOrchestrationContext方法创建的任务,则会发生这种情况。可以在本文https://docs.microsoft.com/zh-cn/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints中找到更多详细信息。

这意味着其称为线程context.CallActivityAsync("PerformSubtask", j)一样的,要求协调器功能的线程。我不知道为什么我的最初示例没有做到这一点,或者为什么您的版本没有做到这一点。它与TPL如何决定使用哪个线程来运行您的ContinueWith委托有关-我需要进一步研究。

好消息是,有一个简单的解决方法,即指定TaskContinuationOptions.ExecuteSynchronously,如下所示:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);
Run Code Online (Sandbox Code Playgroud)

请尝试尝试,让我知道是否可以解决您所观察到的问题。

理想情况下,使用时无需执行此解决方法Task.ContinueWith。我已经在GitHub中打开了一个问题来跟踪此问题:https : //github.com/Azure/azure-functions-durable-extension/issues/317

由于我不想收集活动的结果,因此避免await Tasks.WhenAll(tasks)在安排所有活动之后调用。我这样做是为了减少控制队列上的争用,即如果需要的话,可以启动另一个业务流程。但是,直到所有活动结束运行之前,“协调器”的状态仍为“正在运行”。我猜只有在上一个活动向控制队列中发布“完成”消息后,它才会移至“完成”。

这是预期的。在所有出色的持久性任务都完成之前,Orchestrator功能永远不会真正完成。没有任何方法可以解决此问题。请注意,您仍然可以启动其他协调器实例,如果它们恰好位于同一分区(默认情况下有4个分区),则可能会有争用。


jef*_*lan 0

我认为耐用对于这个工作流程来说绝对有意义。我确实认为最好的选择是利用延迟/计时器功能,正如您所说,但基于执行的同步性质,我认为我不会将所有内容添加到真正期待或.WhenAll().WhenAny()不想要的任务列表中t 的目标。我想我个人只会为每个任务执行一个带有计时器延迟的顺序 foreach 循环。所以伪代码:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

无论如何,您都需要在那里等待,因此只要避免这些等待就await Task.WhenAll(...)可能会导致上面的代码示例中的一些问题。希望有帮助