需要boto3和SWF示例

use*_*737 7 python amazon-swf boto3

亚马逊正在推广boto3用于未来的开发,但没有为新的boto3提供足够的文档.

有没有人有他们愿意分享的使用SWF和boto3的示例代码?

Ada*_*lla 16

这是我到目前为止发现的唯一例子:

https://github.com/jhludwig/aws-swf-boto3

因此,流程概述看起来像这样(请注意,这是直接从上面的链接中提取的,但添加了一些额外的注释,更多的是流程).

应该注意的是,SWF根据事物的名称进行操作.由您的代码决定是否赋予这些名称执行意义.例如,您Decider将进行民意调查并使用任务名称决定接下来会发生什么.

我不确定的一些事情.TASKLIST参考我认为是一种命名空间.它不是真正的一系列事物,它更多的是按名称隔离事物.现在我可能完全错了,从我的基本理解,这就是我认为它的说法.

您可以从任何地方运行您的决策者和工人.由于它们可以触及AWS,因此如果您的防火墙允许0.0.0.0/0出口,您将可以访问.

AWS Docs还提到你可以运行lambda,但我还没有找到如何触发它.

创建boto3 swf客户端:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')
Run Code Online (Sandbox Code Playgroud)

创建一个域

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")
Run Code Online (Sandbox Code Playgroud)

创建域后,我们现在注册工作流程:

注册工作流程

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
Run Code Online (Sandbox Code Playgroud)

通过注册我们的工作流程,我们现在可以开始分配任务.

将任务分配给工作流程.

您可以分配N个任务.请记住,这些主要是字符串,您的代码将赋予它们执行意义.

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")
Run Code Online (Sandbox Code Playgroud)

发送开始工作流程

通过创建域,工作流和任务,我们现在可以开始工作流程.

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response
Run Code Online (Sandbox Code Playgroud)

请注意workflowId,这是一个自定义标识符,例如str(uuid.uuid4()).来自文档:

用户定义的与工作流程执行相关联的标识符.您可以使用此选项将自定义标识符与工作流程执行相关联.如果工作流程执行在逻辑上重新启动上一次执行,则可以指定相同的标识符.您不能同时使用相同的workflowId进行两次打开的工作流程执行.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

在这一点上,没有任何事情会发生,因为我们没有Decider跑步也没有Workers.让我们看看它们的样子.

决胜局

我们的决策者将进行调查,以便做出决定任务以做出以下决定:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Run Code Online (Sandbox Code Playgroud)

请注意上面的超时设置.您可以参考此PR以查看其背后的基本原理:

https://github.com/boto/botocore/pull/634

来自Boto3 SWF文档:

工作人员应将其客户端套接字超时设置为至少70秒(比服务保留轮询请求的最长时间高10秒).

PR是使boto3能够执行该功能的原因.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"
Run Code Online (Sandbox Code Playgroud)

请注意,在此代码段的末尾,我们会检查是否有,ActivityTaskCompleted并且我们会做出CompleteWorkflowExecution让SWF知道我们已完成的决定.

这是决策者,工人的样子是什么?

工人

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

请再次注意,我们设置了 read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Run Code Online (Sandbox Code Playgroud)

现在我们开始我们的工人民意调查:

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"
Run Code Online (Sandbox Code Playgroud)

我们再次向SWF表示我们已经完成了我们的工作.