Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案?

H2O*_*aCl 12 azure long-running-processes asp.net-core azure-appservice

在不需要数据库且无需对此应用程序之外的任何内容进行 IO 的应用程序中,在 Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案是什么?这是一个计算任务。

具体来说,以下是不可靠的,需要一个解决方案。

[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
    Debug.Assert(inbound.Message.Equals("Hello server."));
    Outbound outbound = new Outbound();
    long Billion = 1000000000;
    for (long i = 0; i < 33 * Billion; i++) // 230 seconds
        ;
    outbound.Message = String.Format("The server processed inbound object.");
    return outbound;
}
Run Code Online (Sandbox Code Playgroud)

这有时会返回一个空对象HttpClient(未显示)。较小的工作量总是会成功。例如,30 亿次迭代总是成功的。一个更大的数字会很好,特别是 2400 亿是一个要求。

我认为在 2020 年,带有 .NET Core 的 Azure 应用服务的一个合理目标可能是在 8 个子线程的帮助下将父线程数提高到 2400 亿,因此每个子线程数达到 300 亿,父线程划分 8 M 字节入站对象转换为入站到每个子项的较小对象。每个子进程收到一个 1 M 字节的入站数据,并将 1 M 字节的出站数据返回给父级。父节点将结果重新组装成 8 M 字节的出站。

显然,经过的时间将是单线程实现所需时间的 12.5%,或 1/8,或八分之一。与计算时间相比,切割和重新组装对象的时间很小。我假设传输对象的时间与计算时间相比非常小,因此 12.5% 的期望值大致准确。

如果我能得到 4 或 8 个内核,那就太好了。如果我能得到线程,让我说一个核心周期的 50%,那么我可能需要 8 或 16 个线程。如果每个线程给我 33% 的内核周期,那么我需要 12 或 24 个线程。

我正在考虑BackgroundService上课,但我正在寻找确认这是正确的方法。微软说...

BackgroundService is a base class for implementing a long running IHostedService.
Run Code Online (Sandbox Code Playgroud)

显然,如果某些东西长时间运行,最好通过使用多核通过让它更快地完成,System.Threading但是这个文档似乎System.Threading只在通过 启动任务的上下文中提到System.Threading.Timer。我的示例代码显示我的应用程序中不需要计时器。HTTP POST 将作为工作的机会。通常我会System.Threading.Thread用来实例化多个对象以使用多个内核。我发现在需要很长时间的工作解决方案的上下文中,没有提及多核是一个明显的遗漏,但可能有某种原因 Azure 应用服务不处理这个问题。也许我只是无法在教程和文档中找到它。

任务的启动是图示的 HTTP POST 控制器。假设最长的作业需要 10 分钟。HTTP 客户端(未显示)将超时限制设置为 1000 秒,这远远超过 10 分钟(600 秒),以便有安全边际。HttpClient.Timeout是相关属性。目前我认为 HTTP 超时是一个真正的限制;而不是某种非约束性(假限制),这样一些其他约束会导致用户等待 9 分钟并收到错误消息。一个真正的绑定限制是一个我可以说“但对于这个超时它会成功”的限制。如果 HTTP 超时不是真正的绑定限制,并且还有其他限制系统的东西,我可以调整我的 HTTP 控制器,改为使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。POST2 的意思是告诉我它是否完成了。POST3 意味着给我出站对象。

在不需要数据库且无需对此应用程序之外的任何内容进行 IO 的应用程序中,在 Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案是什么?这是一个计算任务。

And*_*ndy 20

序幕

几年前遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时处理需要 10 秒,有时可能需要一个小时。

起初,我们按照您的问题说明了这一点:向服务发送请求,服务处理请求中的数据并在完成后返回响应。

手头的问题

当作业只需要大约一分钟或更短的时间时,这很好,但是超过此时间,服务器将关闭会话并且调用者将报告错误。

服务器在放弃请求之前有大约 2 分钟的默认时间来产生响应。它不会退出请求的处理......但它会退出 HTTP 会话。你在你的 上设置什么参数并不重要HttpClient,服务器是委托多长时间太长的那个。

问题原因

这一切都是有充分理由的。服务器套接字非常昂贵。你的数量有限。服务器试图通过切断时间超过指定时间的请求来保护您的服务,以避免套接字饥饿问题。

通常,您希望 HTTP 请求只需要几毫秒。如果它们花费的时间比这更长,并且您的服务必须以高速率满足其他请求,您最终会遇到套接字问题。

解决方案

我们决定走 的路线IHostedService,特别是BackgroundService. 我们将此服务与队列结合使用。通过这种方式,您可以设置一个作业队列,并且BackgroundService一次处理它们(在某些情况下,我们有一次处理多个队列项目的服务,在其他情况下,我们水平扩展产生两个或更多队列)。

为什么 ASP.NET Core 服务运行BackgroundService? 我想在不与任何特定于 Azure 的构造紧密耦合的情况下处理这个问题,以防我们需要从 Azure 转移到其他一些云服务(回到那天,我们出于其他原因考虑这样做。)

这对我们来说效果很好,从那以后我们没有看到任何问题。

工作流程是这样的:

  1. 调用者向服务发送带有一些参数的请求
  2. 服务生成“作业”对象并通过 202(已接受)响应立即返回 ID
  3. 服务将此作业放入由 BackgroundService
  4. 调用者可以使用此作业 ID 查询作业状态并获取有关已完成多少以及剩余多少的信息
  5. 服务完成作业,将作业置于“已完成”状态并返回等待队列以产生更多作业

请记住,您的服务能够在运行多个实例的情况下进行水平扩展。在这种情况下,我使用 Redis 缓存来存储作业的状态,以便所有实例共享相同的状态。

如果您没有可用的 Redis 缓存,我还添加了“内存缓存”选项以在本地测试内容。您可以在服务器上运行“Memory Cache”服务,只要知道如果它扩展,那么您的数据就会不一致。

例子

由于我已婚并有孩子,所以在每个人都上床睡觉后的周五晚上我真的不会做太多事情,所以我花了一些时间整理了一个示例,您可以尝试一下。完整的解决方案也可供您试用。

QueuedBackgroundService.cs

此类实现有两个特定目的:一个是从队列中读取(BackgroundService实现),另一个是写入队列(IQueuedBackgroundService实现)。

    public interface IQueuedBackgroundService
    {
        Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
    }

    public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
    {
        private sealed class JobQueueItem
        {
            public string JobId { get; set; }
            public JobParametersModel JobParameters { get; set; }
        }

        private readonly IComputationWorkService _workService;
        private readonly IComputationJobStatusService _jobStatusService;

        // Shared between BackgroundService and IQueuedBackgroundService.
        // The queueing mechanism could be moved out to a singleton service. I am doing
        // it this way for simplicity's sake.
        private static readonly ConcurrentQueue<JobQueueItem> _queue =
            new ConcurrentQueue<JobQueueItem>();
        private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

        public QueuedBackgroundService(IComputationWorkService workService,
            IComputationJobStatusService jobStatusService)
        {
            _workService = workService;
            _jobStatusService = jobStatusService;
        }

        /// <summary>
        /// Transient method via IQueuedBackgroundService
        /// </summary>
        public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
        {
            var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
            _queue.Enqueue(new JobQueueItem { JobId = jobId, JobParameters = jobParameters });
            _signal.Release(); // signal for background service to start working on the job
            return new JobCreatedModel { JobId = jobId, QueuePosition = _queue.Count };
        }

        /// <summary>
        /// Long running task via BackgroundService
        /// </summary>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while(!stoppingToken.IsCancellationRequested)
            {
                JobQueueItem jobQueueItem = null;
                try
                {
                    // wait for the queue to signal there is something that needs to be done
                    await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);

                    // dequeue the item
                    jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;

                    if(jobQueueItem != null)
                    {
                        // put the job in to a "processing" state
                        await _jobStatusService.UpdateJobStatusAsync(
                            jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);

                        // the heavy lifting is done here...
                        var result = await _workService.DoWorkAsync(
                            jobQueueItem.JobId, jobQueueItem.JobParameters,
                            stoppingToken).ConfigureAwait(false);

                        // store the result of the work and set the status to "finished"
                        await _jobStatusService.StoreJobResultAsync(
                            jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
                    }
                }
                catch(TaskCanceledException)
                {
                    break;
                }
                catch(Exception ex)
                {
                    try
                    {
                        // something went wrong. Put the job in to an errored state and continue on
                        await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
                        {
                            Exception = new JobExceptionModel(ex)
                        }, JobStatus.Errored).ConfigureAwait(false);
                    }
                    catch(Exception)
                    {
                        // TODO: log this
                    }
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

它是这样注入的:

    services.AddHostedService<QueuedBackgroundService>();
    services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();
Run Code Online (Sandbox Code Playgroud)

计算控制器.cs

用于读/写作业的控制器如下所示:

    [ApiController, Route("api/[controller]")]
    public class ComputationController : ControllerBase
    {
        private readonly IQueuedBackgroundService _queuedBackgroundService;
        private readonly IComputationJobStatusService _computationJobStatusService;

        public ComputationController(
            IQueuedBackgroundService queuedBackgroundService,
            IComputationJobStatusService computationJobStatusService)
        {
            _queuedBackgroundService = queuedBackgroundService;
            _computationJobStatusService = computationJobStatusService;
        }

        [HttpPost, Route("beginComputation")]
        [ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
        public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
        {
            return Accepted(
                await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
        }

        [HttpGet, Route("computationStatus/{jobId}")]
        [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
        [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
        public async Task<IActionResult> GetComputationResultAsync(string jobId)
        {
            var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
            if(job != null)
            {
                return Ok(job);
            }
            return NotFound($"Job with ID `{jobId}` not found");
        }

        [HttpGet, Route("getAllJobs")]
        [ProducesResponseType(StatusCodes.Status200OK,
            Type = typeof(IReadOnlyDictionary<string, JobModel>))]
        public async Task<IActionResult> GetAllJobsAsync()
        {
            return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
        }

        [HttpDelete, Route("clearAllJobs")]
        [ProducesResponseType(StatusCodes.Status200OK)]
        [ProducesResponseType(StatusCodes.Status401Unauthorized)]
        public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
        {
            if(permission == "this is flakey security so this can be run as a public demo")
            {
                await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
                return Ok();
            }
            return Unauthorized();
        }
    }
Run Code Online (Sandbox Code Playgroud)

工作示例

只要这个问题是活跃的,我就会维护一个您可以尝试的工作示例。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每次迭代为 1 秒。因此,如果您将迭代值设置为 60,它将运行该作业 60 秒。

在它运行时,运行computationStatus/{jobId}getAllJobs端点。您可以实时查看所有工作更新。

这个例子远不是一个功能齐全的、涵盖所有边缘情况的、成熟的、可以投入生产的例子,但它是一个好的开始。

结论

在后端工作了几年后,我看到很多问题都是由于不了解后端的所有“规则”而产生的。希望这个答案能够对我过去遇到的问题有所了解,并希望这可以使您不必处理上述问题。


Noa*_*ahl 2

一种选择是尝试Azure Durable Functions,它更适合长时间运行的作业,这些作业保证检查点和状态,而不是尝试在触发请求的上下文中完成。它还具有扇出/扇入的概念,以防您所描述的内容可以分为具有聚合结果的较小作业。

如果目标只是原始计算,Azure Batch可能是更好的选择,因为它有助于扩展。