在小天蓝色实例中使用Parallel.Foreach

Yar*_*evi 2 file-upload azure azure-storage task-parallel-library azure-storage-blobs

我在一个小实例上运行了一个WebRole.此WebRole有一个方法可将大量文件上载到BLOB存储.根据Azure实例规范,一个小实例只有一个核心.那么当上传这些blob时,Parallel.Foreach会比普通的Foreach给我任何好处吗?

Dre*_*rsh 5

通过专注于使用blob存储API和/或StreamAPI 的aysnc版本,您将获得更好的服务,这样您就可以受I/O限制而不受CPU限制.在任何有BeginXXX API的地方,您应该使用它Task.Factory.FromAsync并使用从那里继续使用它.在您的具体情况下,您应该利用CloudBlob.BeginUploadFromStream.最初如何获取流也同样重要,因此也要寻找异步API.

在此之后,唯一可能阻止你使用一个小实例的是它的上限为100Mbps,其中媒体为200Mbps.然后,当您需要更多处理时,您可以随时利用弹性系数并增加角色数,并在事情平静时再次缩减.

以下是您将如何BeginUploadFromStream使用的示例FromAsync.现在,就协调并发处理而言,由于您现在开始执行异步任务,因此您不能指望Parallel :: ForEach为您约束最大并发性.这意味着你只需要在原始线程上有一个常规foreach,Semaphore以限制并发性.这将提供相当于MaxDegreeOfParallelism:

// Setup a semaphore to constrain the max # of concurrent "thing"s we will process
int maxConcurrency = ... read from config ...
Semaphore maxConcurrentThingsToProcess = new Semaphore(maxConcurrency, maxConcurrency);

// Current thread will enumerate and dispatch I/O work async, this will be the only CPU resource we're holding during the async I/O
foreach(Thing thing in myThings)
{
    // Make sure we haven't reached max concurrency yet
    maxConcurrentThingsToProcess.WaitOne();

    try
    {
        Stream mySourceStream = ... get the source stream from somewhere ...;
        CloudBlob myCloudBlob = ... get the blob from somewhere ...;

        // Begin uploading the stream asynchronously
        Task uploadStreamTask = Task.Factory.FromAsync(
            myCloudBlob.BeginUploadFromStream,
            myCloudBlob.EndUploadFromStream,
            mySourceStream,
            null);

        // Setup a continuation that will fire when the upload completes (regardless of success or failure)
        uploadStreamTask.ContinueWith(uploadStreamAntecedent =>
        {
            try
            {
                // upload completed here, do any cleanup/post processing
            }
            finally
            {
                // Release the semaphore so the next thing can be processed
                maxConcurrentThingsToProcess.Release();
            }
        });
    }
    catch
    {
        // Something went wrong starting to process this "thing", release the semaphore
        maxConcurrentThingsToProcess.Release();

        throw;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在在这个示例中,我没有展示您应该如何异步获取源流,但是,例如,如果您从其他地方的URL下载该流,您可能希望异步启动它并链接起始异步上传到这里继续.

相信我,我知道这不仅仅是做一个简单的代码Parallel::ForEach,而是Parallel::ForEach存在使CPU绑定任务的并发变得容易.在I/O方面,使用异步API是实现最大I/O吞吐量同时最小化CPU资源的唯一方法.