使用AWS S3 SDK for .NET从Amazon S3下载并行批处理文件

Den*_*kem 13 .net c# amazon-s3 amazon-web-services c#-4.0

问题:我想使用他们的.NET SDK从AWS S3并行下载100个文件.下载的内容应存储在100个内存流中(文件足够小,我可以从那里获取).我在Task,IAsyncResult,Parallel.*和.NET 4.0中的其他不同方法之间感到困惑.

如果我试图自己解决这个问题,我会想象这样的伪代码:(编辑为某些变量添加类型)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});
Run Code Online (Sandbox Code Playgroud)

此代码并行启动100个请求,这很好.但是,有两个问题:

  1. 最后一个语句将按顺序下载文件,而不是并行下载.流上似乎没有BeginCopyTo()/ EndCopyTo()方法......
  2. 在所有请求都已响应之前,上述声明不会放弃.换句话说,在所有文件都启动之前,所有文件都不会开始下载.

所以在这里我开始想我正在走错路......

救命?

cas*_*One 21

如果将操作分解为一个异步处理一个请求然后再调用100次的方法,则可能更容易.

首先,让我们确定您想要的最终结果.因为你要使用的是一个MemoryStream意味着你Task<MemoryStream>要从你的方法中返回一个.签名看起来像这样:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
Run Code Online (Sandbox Code Playgroud)

因为您的AmazonS3对象实现了异步设计模式,所以您可以使用该类上的FromAsync方法从实现异步设计模式的TaskFactory生成一个Task<T>,如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?
Run Code Online (Sandbox Code Playgroud)

因此,您已经处于一个好的位置,您Task<T>可以在呼叫完成时等待或获得回叫.但是,您需要以某种方式GetObjectResponse将从调用返回的内容Task<GetObjectResponse>转换为MemoryStream.

为此,您希望在类上使用该ContinueWith方法Task<T>.把它看成是对的异步版本Select的方法Enumerable,它只是一个投影到另一个Task<T>不同之处在于每次调用时ContinueWith,你可能会创建一个运行的新任务部分代码.

有了它,您的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}
Run Code Online (Sandbox Code Playgroud)

请注意,在上面你可能会调用传递的重载ContinueWithTaskContinuationOptions.ExecuteSynchronously,因为看起来你做的工作很少(我不知道,响应可能很大).如果您正在进行非常小的工作而TaskContinuationOptions.ExecuteSynchronously不必为了完成工作而开始新任务,那么您应该通过这样做,这样您就不会浪费时间来创建新任务以进行最少的操作.

现在您已经拥有了可以将一个请求转换为a的方法Task<MemoryStream>,创建一个可以处理任意数量的包装器的包装器很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}
Run Code Online (Sandbox Code Playgroud)

在上面,您只需执行一系列GetObjectRequest实例,它将返回一个数组Task<MemoryStream>.返回物化序列的事实很重要.如果在返回之前没有实现它,则在迭代序列之前不会创建任务.

当然,如果你想要这种行为,那么无论如何,只需删除调用.ToArray(),让方法返回IEnumerable<Task<MemoryStream>>,然后在迭代任务时进行请求.

从那里,您可以一次处理一个(使用循环中的Task.WaitAny方法)或等待所有这些完成(通过调用Task.WaitAll方法).后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}
Run Code Online (Sandbox Code Playgroud)

此外,应该提到的是,这非常适合Reactive Extensions框架,因为它非常适合IObservable<T>实现.

  • 这是一个很好的解决方案,在我发布问题后大约20分钟内得到了很好的描述.我很开心.在我修复以添加更准确的S3类名称并指定更具体的FromAsync()方法之后,它对我也很有用.卡斯帕,您是否希望我在更改中编辑您的答案? (2认同)