如何执行Azure Queue触发的大量持久功能?

Tar*_*huk 9 c# azure azure-functions

如果简单,我们的任务是处理大量输入消息.为了解决这个问题,我们决定使用Azure队列存储和Azure功能.我们有Azure Functions结构,类似于以下代码:

队列触发功能:

[FunctionName("MessageControllerExecutor")]
public static async void Run(
    [QueueTrigger(QUEUE_NAME, Connection = QUEUE_CONNECTION_NAME)]string queueMessage,
    [OrchestrationClient] DurableOrchestrationClient client,
    TraceWriter log)
{
    await client.StartNewAsync("MessageController", queueMessage);
}
Run Code Online (Sandbox Code Playgroud)

耐用功能:

[FunctionName("MessageController")]
public static async void Run(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    if (!context.IsReplaying) log.Warning("MessageController started");

    var function1ResultTask = context.CallActivityAsync<ResultMessage>("Function_1", new InputMessage());
    var function2ResultTask = context.CallActivityAsync<ResultMessage>("Function_2", new InputMessage());

    await Task.WhenAll(function1ResultTask, function2ResultTask);

    // process Function_1 and Function_2 results
    // ...
}
Run Code Online (Sandbox Code Playgroud)

简单活动功能样本:

[FunctionName("Function_1")]
public static ResultMessage Run(
    [ActivityTrigger] DurableActivityContext activityContext,
    TraceWriter log)
{
    var msg = activityContext.GetInput<InputMessage>();
    int time = new Random().Next(1, 3);
    Thread.Sleep(time * 1000);

    return new ResultMessage()
    {
        Payload = $"Function_1 slept for {time} sec"
    };
}
Run Code Online (Sandbox Code Playgroud)

在队列上收到新项目时触发MessageControllerExecutor.MessageController是一个持久的函数,它使用很少的简单活动函数来处理每条消息.

当我们将消息推送到队列时,MessageControllerExecutor函数立即启动并异步启动,触发MessageController并传递消息,因此它按预期工作.但我们面临的问题是并非所有MessageController函数都运行.例如,我们将100条消息推送到队列中,但MessageController仅处理了大约10-20%的消息.有些消息未经处理或延迟处理.虽然没有任何异常被抛出,但看起来耐用函数无法启动.

所以,我们有几个问题:

  1. 具有队列触发和持久功能的此解决方案是否可以正确处理消息队列,还是有更好的方法可以通过队列触发持久功能?
  2. 运行持久功能有任何限制吗?
  3. 可以在同一时间执行多少个持久功能?

Dre*_*rsh 5

  1. 是的,这是一个完全有效的方式来启动编排!
  2. 当然,这里有一些与性能和可扩展性相关的架构细节.
  3. 你在这里想要问的是:一个持久的函数定义可以同时执行多少个编排实例?这确实是一个非常重要的理解方面.业务流程函数本身就是单线程,并且按照我上面提到的那个链接,在一组控制队列之间进行了平衡.您可以阅读该文档以获取更多信息,但最重要的是您不希望在业务流程功能中执行除实际业务流程之外的任何工作,因为它们是您对可伸缩性的限制.它是业务流程操作函数,其行为与任何其他Azure功能一样,并且对其可伸缩性几乎没有限制.

为了简洁起见,你确实在上面的问题中忽略了编排触发器中的一些代码,但是我明白你在做什么await Task.WhenAll(...)呢?如果它包含任何类型的重要处理,你应该真正将其转化为第三个动作函数(例如Function_3),然后简单地从编排功能返回结果.

更新:我刚刚注意到你的功能被定义为async void.如果我不得不猜测,这实际上会导致运行时出现问题.您可以尝试将其更改为async Task并查看您的问题是否消失?作为定义async void.NET中的方法的一般规则.

  • 感谢更新。实际上,在获取结果时,我们的 MessageController 中存在并发问题(从 `var function1Result = function1ResultTask.Result` 更改为 `var function1Result = await function1ResultTask`)。此外,我们将 MessageController 从 `async void` 更改为 `async Task`,所以现在一切都按预期工作。 (2认同)