AWS Lambda / Aws批处理工作流程

Joh*_*ich 6 java workflow amazon-s3 amazon-web-services aws-batch

我写了一个从s3存储桶触发的lambda,以解压缩zip文件并处理其中的文本文档。由于lambda内存的限制,我需要将流程移至类似AWS Batch的位置。如果我错了,请纠正我,但我的工作流程应如下所示。

工作流程

我相信我需要编写一个lambda来将s3存储桶的位置放在亚马逊SQS上,因为AWS批处理可以读取该位置并进行所有解压缩/数据处理,因为它们的内存更大。

这是我当前的lambda,它接收由s3存储桶触发的事件,检查它是否是一个zip文件,然后将该s3键的名称推送到SQS。我应该告诉AWS Batch在我的lambda中开始读取队列吗?一般而言,我对AWS完全陌生,不确定从这里开始。

public class dockerEventHandler implements RequestHandler<S3Event, String> {

private static BigData app = new BigData();
private static DomainOfConstants CONST = new DomainOfConstants();
private static Logger log = Logger.getLogger(S3EventProcessorUnzip.class);

private static AmazonSQS SQS;
private static CreateQueueRequest createQueueRequest;
private static Matcher matcher;
private static String srcBucket, srcKey, extension, myQueueUrl;

@Override
public String handleRequest(S3Event s3Event, Context context) 
{
    try {
        for (S3EventNotificationRecord record : s3Event.getRecords())
        {
            srcBucket = record.getS3().getBucket().getName();
            srcKey = record.getS3().getObject().getKey().replace('+', ' ');
            srcKey = URLDecoder.decode(srcKey, "UTF-8");
            matcher = Pattern.compile(".*\\.([^\\.]*)").matcher(srcKey);

            if (!matcher.matches()) 
            {
                log.info(CONST.getNoConnectionMessage() + srcKey);
                return "";
            }
            extension = matcher.group(1).toLowerCase();

            if (!"zip".equals(extension)) 
            {
                log.info("Skipping non-zip file " + srcKey + " with extension " + extension);
                return "";
            }
            log.info("Sending object location to key" + srcBucket + "//" + srcKey);

            //pass in only the reference of where the object is located
            createQue(CONST.getQueueName(), srcKey);
        }
    }
    catch (IOException e)
    {
        log.error(e);           
    }
    return "Ok";
} 

/*
 * 
 * Setup connection to amazon SQS
 * TODO - Find updated api for sqs connection to eliminate depreciation
 * 
 * */
@SuppressWarnings("deprecation")
public static void sQSConnection() {
    app.setAwsCredentials(CONST.getAccessKey(), CONST.getSecretKey());       
    try{
        SQS = new AmazonSQSClient(app.getAwsCredentials()); 
        Region usEast1 = Region.getRegion(Regions.US_EAST_1);
        SQS.setRegion(usEast1);
    } 
    catch(Exception e){
        log.error(e);       
    }
}

//Create new Queue
public static void createQue(String queName, String message){
    createQueueRequest = new CreateQueueRequest(queName);
    myQueueUrl = SQS.createQueue(createQueueRequest).getQueueUrl();
    sendMessage(myQueueUrl,message);
}

//Send reference to the s3 objects location to the queue
public static void sendMessage(String SIMPLE_QUE_URL, String S3KeyName){
    SQS.sendMessage(new SendMessageRequest(SIMPLE_QUE_URL, S3KeyName));
}

//Fire AWS batch to pull from que
private static void initializeBatch(){
    //TODO
}
Run Code Online (Sandbox Code Playgroud)

我已经安装了泊坞窗并了解了泊坞窗映像。我相信我的docker映像应包含所有代码以在一个docker映像/容器中读取队列,解压缩,处理文件并将其安装到RDS。

我正在寻找可以完成类似任务的人,他们可以分享帮助。类似于以下内容:

S3先生:嘿,我有一个文件

Lambda先生:好的,S3,我看到了,嘿,请问您能解压缩并对此进行处理吗

Batch先生:Gotchya lambda先生,请妥善保管,然后将其放入RDS或某些数据库中。

我还没有编写class / docker映像,但是我已经完成了处理/解压缩并完成rds的所有代码。由于某些文件1GB或更大,Lambda仅限于内存。

Chr*_*ite 7

好的,在浏览完Batch上的AWS文档之后,您不需要SQS队列。批处理具有一个称为“作业队列”的概念,该概念类似于SQS FIFO队列,但不同之处在于这些作业队列具有优先级,并且其中的作业可以依赖于其他作业。基本过程是:

  1. 首先,很奇怪的部分是设置IAM角色,以便容器代理可以与容器服务进行通信,并且AWS Batch能够在需要时启动各种实例(如果您发现实例,则还需要一个单独的角色)。有关所需权限的详细信息,请参见本文档(PDF)第54页左右
  2. 现在,完成此操作后,您就可以设置计算环境了。这些是保存您的容器的按需EC2或竞价型实例。作业在容器级别上进行。这个想法是,您的计算环境是作业容器可以利用的最大资源分配。一旦达到该限制,您的工作就必须等待资源被释放。
  3. 现在,您创建一个作业队列。这会将作业与您创建的计算环境相关联。
  4. 现在,您创建工作定义。好吧,从技术上讲,您不必而且可以通过lambda做到这一点,但这使事情变得容易一些。您的工作定义将指示您的工作将需要哪些容器资源(您当然也可以在lambda中覆盖它)
  5. 现在已经完成了所有操作,您将需要创建一个lambda函数。这将由您的S3存储桶事件触发。该功能将需要必要的IAM权限才能针对批处理服务运行提交作业(以及任何其他权限)。基本上所有lambda要做的就是将提交作业调用到AWS批处理中。您需要的基本参数是作业队列和作业定义。您还将为所需的邮政编码设置S3密钥,作为作业的参数。
  6. 现在,当触发了适当的S3事件时,它将调用lambda,该lambda会将作业提交到AWS批处理作业队列。然后,假设设置一切正常,它将很乐意拉动资源来处理您的工作。请注意,根据EC2实例大小和分配的容器资源,这可能需要一点时间(比准备Lambda函数要长得多)。