Amazon批处理作业进程队列

Fre*_*son 2 queue message-queue amazon-ec2 amazon-web-services

我正在使用AWS EC2实例进行生物信息学工作.我有一个(~1000)个大文件,应该使用EC2实例上的脚本进行处理,结果应该上传回S3存储桶.我想将作业(文件)分发给多个EC2实例,优先以现货价格开始.

我需要的是一个简单易用的排队系统(可能是AWS SQS或其他东西),它可以将作业分配到实例并在实例失败时重新启动作业(由于现货价格过高或其他原因).我研究过AWS SQS示例,但这些示例过于先进,通常涉及自动扩展和复杂的消息生成应用程序.

有人能从概念上指出如何以最简单的方式解决这个问题吗?AWS SQS这个简单应用的任何示例?应该如何启动一堆实例以及如何告诉他们收听队列?

对于每个输入文件,我的工作流程基本上都是这样

aws s3 cp s3://mybucket/file localFile ## Possibly streaming the file without copy
work.py --input localFile --output outputFile
aws s3 cp outputFile s3://mybucket/output/outputFile
Run Code Online (Sandbox Code Playgroud)

Joh*_*ein 11

您正在描述一种非常常见的面向批处理的设计模式:

  • 工作被放入队列中
  • 一个或多个"worker"实例从队列中拉出工作
  • 工作器实例的数量根据队列的大小和工作的紧急程度而扩展
  • 使用现货定价来降低成本

实现这一目标的最佳方法是:

  • 使用Amazon Simple Queuing Service(SQS)存储工作请求
  • 启动Amazon EC2实例,每个实例重复:
    • 从队列的消息
    • 处理消息(例如,通过上面列出的下载/处理/上传步骤)
    • 从队列中删除消息(表示工作已完成)
  • 使用Auto-Scaling控制实例数,以便可以在大量积压的情况下启动更多实例,并且在没有工作时可以关闭所有实例
  • 使用自动扩展组的现货定价,一旦竞价价格降低您的最高出价,实例将自动"恢复生机"

而不是使排队系统"在实例失败时将作业分配给实例并重新启动作业",而SQS仅用于存储作业.Auto-Scaling将负责启动实例(包括重新启动是竞价变化),实例本身会从队列中提取工作.将其视为"拉动"模型而非"推动"模型.

虽然整个系统看起来很复杂,但每个单独的组件都非常简单.我建议一步一步:

  1. 有一个系统以某种方式将工作请求推送到SQS队列.这可以像使用aws sqs put-messageCLI 一样简单,也可以使用Boto(适用于Python的AWS开发工具包)在Python中添加几行代码.

这是一些示例代码(在命令行上使用消息调用它):

#!/usr/bin/python27

import boto, boto.sqs
from boto.sqs.message import Message
from optparse import OptionParser

# Parse command line
parser = OptionParser()
(options, args) = parser.parse_args()

# Send to SQS
q_conn = boto.sqs.connect_to_region('ap-southeast-2')

q = q_conn.get_queue('my-queue')
m = Message()
m.set_body(args[0])
print q.write(m)

print args[0] + ' pushed to Queue'
Run Code Online (Sandbox Code Playgroud)
  1. 配置Amazon EC2实例,该实例可以自动启动从SQS获取并处理您的工作的Python应用程序或脚本.User Data在实例启动时使用该字段触发工作.从shell脚本运行工作流,或者也可以将S3上载/下载代码编写为Python应用程序的一部分(包括循环以继续提取新消息).

这是从队列中检索消息的一些代码:

#!/usr/bin/python27

import boto, boto.sqs
from boto.sqs.message import Message

# Connect to Queue
q_conn = boto.sqs.connect_to_region('ap-southeast-2')
q = q_conn.get_queue('my-queue')

# Get a message
m = q.read(visibility_timeout=15)
if m == None:
  print "No message!"
else:
  print m.get_body()
  q.delete_message(m)
Run Code Online (Sandbox Code Playgroud)
  1. 配置与您刚为EC2创建的配置相匹配的Auto-Scaling启动配置.这告诉Auto-Scaling如何启动实例(例如实例类型,用户数据)以及您愿意支付的Spot Price.
  2. 创建Auto Scaling组以自动启动实例.
  3. 如果您希望Auto Scaling组根据队列的大小添加/删除实例,请配置Scaling策略

也可以看看:


Jon*_*tan 5

截至 2016 年 12 月,AWS 推出了一项名为AWS Batch 的服务 ,该服务可能非常适合(甚至非常适合)问题中描述的工作负载。请先查看批次,然后再选择其他建议之一。