alw*_*ing 3 c# azure azure-functions serverless azure-durable-functions
我有一个具有2个功能和一个存储队列的现有功能应用程序。F1由服务总线主题中的消息触发。对于收到的每个味精,F1计算一些子任务(T1,T2,...),这些子任务必须以不同的延迟量执行。例如-T1将在3分钟后触发,T2将在5分钟后触发,依此类推。F1将消息发布到具有适当可见性超时(以模拟延迟)的存储队列中,并且只要队列中可见消息,就会触发F2。一切正常。
我现在想将此应用程序迁移为使用“耐用功能”。F1现在仅启动协调器。协调器代码如下:
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}
//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}
[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);
//perform subtask work after delay!
}
Run Code Online (Sandbox Code Playgroud)
我只想扇出(不扇入收集结果)并启动子任务。协调器启动所有任务,并避免调用“ await Task.WhenAll”。活动功能调用“ Task.Delay”以等待指定的时间,然后执行其工作。
我的问题
为了避免这种情况,我较早的尝试是在业务流程管理器中使用“ CreateTimer”,然后将活动添加为延续,但我在“历史记录”表中仅看到计时器条目。继续不触发!我是否违反了协调器代码的约束 -“协调器代码绝不能启动任何异步操作”?
foreach (var value in results)
{
//calculate time to start
var timeToStart = ;
var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
tasks.Add(pnTask);
}
Run Code Online (Sandbox Code Playgroud)
更新:使用克里斯建议的方法
计算子任务和延迟的活动
[FunctionName("CalculateTasks")]
public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
{
//in reality time is obtained by calling an endpoint
DateTime currentTime = DateTime.UtcNow;
return new List<TaskInfo> {
new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
};
}
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
var currentTime = context.CurrentUtcDateTime;
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
TimeSpan timeDifference = currentTime - value.Origin;
TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
var timeToStart = currentTime.Add(actualDelay);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)简单地从协调器内部调度任务似乎可以工作。就我而言,我正在计算任务和循环之前另一个活动(CalculateTasks)中的延迟。我希望使用活动运行时的“当前时间”来计算延迟。我在活动中使用DateTime.UtcNow。在协调器中使用时,这种方式不能很好地发挥作用。“ ContinueWith”指定的活动不会运行,并且协调器始终处于“正在运行”状态。
我不能使用业务流程在协调器内部记录的时间吗?
更新2
因此,克里斯建议的解决方法有效!
由于我不想收集活动的结果,因此避免await Tasks.WhenAll(tasks)在安排所有活动之后调用“ ”。我这样做是为了减少控制队列上的争用,即如果需要的话,可以启动另一个业务流程。尽管如此,“协调器”的状态仍然为“ 正在运行 ”,直到所有活动完成运行。我猜想只有在最后一个活动向控制队列中发布“完成”消息后,它才会移至“ 完成 ”。
我对吗?有什么办法可以提早释放协调器,即在安排所有活动之后释放该协调器吗?
该ContinueWith方法对我来说很好。我可以使用以下协调器代码模拟您的方案的版本:
[FunctionName("Orchestrator")]
public static async Task Orchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
int j = i;
DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)
值得一提的是,这是活动功能代码。
[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
log.Warning($"{DateTime.Now:o}: {j:00}");
}
Run Code Online (Sandbox Code Playgroud)
从日志输出中,我看到所有活动调用彼此之间相距10秒。
另一种方法是将其散布到多个子业务流程(如建议的@jeffhollan),这些业务流程很短,只有很短的时间,包括持久的计时器延迟和您的活动调用。
更新 我尝试使用您更新的样本,并且能够重现您的问题!如果您在Visual Studio中本地运行并将异常设置配置为始终在异常时中断,那么您应该看到以下内容:
System.InvalidOperationException:'检测到多线程执行。如果协调器功能代码等待不是由DurableOrchestrationContext方法创建的任务,则会发生这种情况。可以在本文https://docs.microsoft.com/zh-cn/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints中找到更多详细信息。
这意味着其称为线程context.CallActivityAsync("PerformSubtask", j)是不一样的,要求协调器功能的线程。我不知道为什么我的最初示例没有做到这一点,或者为什么您的版本没有做到这一点。它与TPL如何决定使用哪个线程来运行您的ContinueWith委托有关-我需要进一步研究。
好消息是,有一个简单的解决方法,即指定TaskContinuationOptions.ExecuteSynchronously,如下所示:
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(
t => context.CallActivityAsync("PerformSubtask", j),
TaskContinuationOptions.ExecuteSynchronously);
Run Code Online (Sandbox Code Playgroud)
请尝试尝试,让我知道是否可以解决您所观察到的问题。
理想情况下,使用时无需执行此解决方法Task.ContinueWith。我已经在GitHub中打开了一个问题来跟踪此问题:https : //github.com/Azure/azure-functions-durable-extension/issues/317
由于我不想收集活动的结果,因此避免
await Tasks.WhenAll(tasks)在安排所有活动之后调用。我这样做是为了减少控制队列上的争用,即如果需要的话,可以启动另一个业务流程。但是,直到所有活动结束运行之前,“协调器”的状态仍为“正在运行”。我猜只有在上一个活动向控制队列中发布“完成”消息后,它才会移至“完成”。
这是预期的。在所有出色的持久性任务都完成之前,Orchestrator功能永远不会真正完成。没有任何方法可以解决此问题。请注意,您仍然可以启动其他协调器实例,如果它们恰好位于同一分区(默认情况下有4个分区),则可能会有争用。
我认为耐用对于这个工作流程来说绝对有意义。我确实认为最好的选择是利用延迟/计时器功能,正如您所说,但基于执行的同步性质,我认为我不会将所有内容添加到真正期待或.WhenAll()您.WhenAny()不想要的任务列表中t 的目标。我想我个人只会为每个任务执行一个带有计时器延迟的顺序 foreach 循环。所以伪代码:
for(int x = 0; x < results.Length; x++) {
await context.CreateTimer(TimeSpan.FromMinutes(1), ...);
await context.CallActivityAsync("PerformTaskAsync", results[x]);
}
无论如何,您都需要在那里等待,因此只要避免这些等待就await Task.WhenAll(...)可能会导致上面的代码示例中的一些问题。希望有帮助
| 归档时间: |
|
| 查看次数: |
1505 次 |
| 最近记录: |