使用Amazon SWF在服务器之间进行通信

Jim*_*mmy 9 python linux boto amazon-web-services amazon-swf

使用Amazon SWF在服务器之间传递消息?

  1. 在服务器AI上想要运行脚本A.
  2. 完成后,我想向服务器B发送一条消息来运行脚本B.
  3. 如果成功完成,我希望它从工作流队列中清除作业

我正在努力研究如何组合使用Boto和SWF来做到这一点.我不是在完成一些完整的代码之后,但我所追求的是,​​如果有人能够更多地解释所涉及的内容.

  • 我如何实际告诉服务器B检查脚本A的完成情况?
  • 如何确保服务器A不会完成脚本A并尝试运行脚本B(因为服务器B应运行此脚本)?
  • 如何实际通知SWF脚本A完成?你是旗帜,消息还是什么?

正如你所看到的,我对这一切真的很困惑,如果有人可以对此有所了解,我会非常感激.

ooz*_*arm 17

我想你会问一些非常好的问题,这些问题突出了SWF作为一项服务的有用性.简而言之,您不会告诉您的服务器之间协调工作.在SWF服务的帮助下,您的决策者会为您协调所有这些.

您的工作流程的实施将如下所示:

  1. 使用服务注册您的工作流程及其活动(一次性).
  2. 实施决策者和工人.
  3. 让你的工人和决策者跑步.
  4. 开始新的工作流程.

有许多方法可以将凭证提供给boto.swf的代码.出于本练习的目的,我建议在运行以下代码之前将它们导出到环境中:

export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
Run Code Online (Sandbox Code Playgroud)

1)要注册域,工作流和活动执行以下操作:

# ab_setup.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
Run Code Online (Sandbox Code Playgroud)

2)实施和运行决策者和工人.

# ab_decider.py
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class ABDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        # Print history to familiarize yourself with its format.
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                   ACTIVITY1, VERSION, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == ACTIVITY1:
                    # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                        ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                elif activity_name == ACTIVITY2:
                    # Server B completed activity. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True
Run Code Online (Sandbox Code Playgroud)

工作者要简单得多,如果你不愿意,你不需要使用继承.

# ab_worker.py
import os
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class MyBaseWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = None

    def run(self):
        activity_task = self.poll()
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                self.activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'

    def activity(self, activity_input):
        result = str(time.time())
        print 'worker a reporting time: %s' % result
        self.complete(result=result)

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'

    def activity(self, activity_input):
        result = str(os.getpid())
        print 'worker b returning pid: %s' % result
        self.complete(result=result)
Run Code Online (Sandbox Code Playgroud)

3)管理你的决策者和工人.您的决策者和工作人员可能是从单独的主机运行,也可能是从同一台机器运行.打开四个终端并运行你的演员:

首先是你的决定者

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass
... 
Run Code Online (Sandbox Code Playgroud)

然后是工作人员A,你可以从服务器A执行此操作:

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass
Run Code Online (Sandbox Code Playgroud)

然后是工作人员B,可能来自服务器B,但如果你从笔记本电脑上运行它们,那么它也会起作用:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass
... 
Run Code Online (Sandbox Code Playgroud)

4)最后,开始工作流程.

$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>> 
Run Code Online (Sandbox Code Playgroud)

切换回看你的演员会发生什么.一分钟不活动后,他们可能会断开服务.如果发生这种情况,请按向上箭头+输入以重新进入轮询循环.

您现在可以转到AWS管理控制台的SWF面板,查看执行操作的执行情况并查看其历史记录.或者,您可以通过命令行查询它.

>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ...
Run Code Online (Sandbox Code Playgroud)

这只是一个串行执行活动的工作流程的示例,但决策者也可以安排和协调活动的并行执行.

我希望这至少可以让你开始.对于一个稍微复杂的串行工作流示例,我建议看一下.


小智 5

我没有任何示例代码可供共享,但您绝对可以使用SWF来协调跨两个服务器的脚本执行.这个主要思想是创建三个与SWF对话的代码:

  • 一个组件,它知道首先执行哪个脚本以及第一个脚本执行完毕后要执行的操作.这在SWF术语中称为"决策者".
  • 两个组件,每个组件都了解如何执行要在每台计算机上运行的特定脚本.这些被称为SWF术语中的"活动工作者".

第一个组件是决策程序,它调用两个SWF API:PollForDecisionTask和RespondDecisionTaskCompleted.轮询请求将为决策程序组件提供执行工作流的当前历史记录,基本上是脚本运行程序的"我在哪里"状态信息.您编写的代码可以查看这些事件并找出应该执行的脚本.执行脚本的这些"命令"将采用活动任务的调度形式,该活动任务作为对RespondDecisionTaskCompleted的调用的一部分返回.

您编写的第二个组件,活动工作者,每个调用两个SWF API:PollForActivityTask和RespondActivityTaskCompleted.轮询请求将为活动工作者指示它应该执行它知道的脚本,SWF调用活动任务.从轮询请求返回到SWF的信息可以包括作为活动任务调度的一部分发送到SWF的单个执行特定数据.您的每个服务器都将独立轮询SWF以查找活动任务,以指示该主机上本地脚本的执行情况.一旦工作人员完成了脚本的执行,它就会通过RespondActivityTaskCompleted API回调SWF.

从您的活动工作者到SWF的回调导致将新的历史记录分发给我已经提到的决策程序组件.它将查看历史记录,查看第一个脚本已完成,并安排第二个脚本执行.一旦它看到第二个完成,它就可以使用另一种类型的决定"关闭"工作流程.

您可以通过调用StartWorkflowExecution API启动在每个主机上执行脚本的整个过程.这将在SWF中创建整个过程的记录,并将第一个历史记录输出到决策程序进程,以计划在第一个主机上执行第一个脚本.

希望这为如何使用SWF完成此类工作流提供了更多背景信息.如果您还没有,我会查看SWF页面上的开发指南以获取更多信息.