我想将我的一些任务重写为管道.主要是因为我需要一种方法来检测任务何时完成或以特定顺序启动任务.我的问题是我不知道如何将递归任务重写为管道.通过递归我的意思是自称为这样的任务:
class MyTask(webapp.RequestHandler):
def post(self):
cursor = self.request.get('cursor', None)
[set cursor if not null]
[fetch 100 entities form datastore]
if len(result) >= 100:
[ create the same task in the queue and pass the cursor ]
[do actual work the task was created for]
Run Code Online (Sandbox Code Playgroud)
现在我真的想把它写成一个管道并做类似的事情:
class DoSomeJob(pipeline.Pipeline):
def run(self):
with pipeline.InOrder():
yield MyTask()
yield MyOtherTask()
yield DoSomeMoreWork(message2)
Run Code Online (Sandbox Code Playgroud)
任何有关这一方面的帮助将不胜感激.谢谢!
Ane*_*pic 12
基本管道只返回一个值:
class MyFirstPipeline(pipeline.Pipeline):
def run(self):
return "Hello World"
Run Code Online (Sandbox Code Playgroud)
该值必须是JSON可序列化的.
如果需要协调多个管道,则需要使用生成器管道和yield语句.
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
yield MyFirstPipeline()
Run Code Online (Sandbox Code Playgroud)
您可以将管道的收益视为返回"未来".
您可以将此未来作为输入arg传递给另一个管道:
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
result = yield MyFirstPipeline()
yield MyOtherPipeline(result)
Run Code Online (Sandbox Code Playgroud)
Pipeline API将确保只有在将来将其解析为实际值时才调用run方法.MyOtherPipelineresultMyFirstPipeline
你不能混合yield和return使用相同的方法.如果您使用yield的值必须是Pipeline实例.如果要执行此操作,这可能会导致问题:
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield results
Run Code Online (Sandbox Code Playgroud)
在这种情况下,Pipeline API只会在最后一行中看到一个列表yield results,因此它不知道在返回之前解析其中的未来,您将收到错误.
他们没有记录,但有一个公用设施管道库,可以在这里提供帮助:https:
//code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py
所以上面实际工作的版本看起来像:
import pipeline
from pipeline import common
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield common.List(*results)
Run Code Online (Sandbox Code Playgroud)
现在我们没问题,我们正在产生一个管道实例,Pipeline API知道如何正确地解决它的未来价值.common.List管道的来源非常简单:
class List(pipeline.Pipeline):
"""Returns a list with the supplied positional arguments."""
def run(self, *args):
return list(args)
Run Code Online (Sandbox Code Playgroud)
...在调用此管道的run方法时,Pipeline API已将列表中的所有项目解析为实际值,可以将其作为传入*args.
无论如何,回到原来的例子,你可以这样做:
class FetchEntitites(pipeline.Pipeline):
def run(self, cursor=None)
if cursor is not None:
cursor = Cursor(urlsafe=cursor)
# I think it's ok to pass None as the cursor here, haven't confirmed
results, next_curs, more = MyModel.query().fetch_page(100,
start_cursor=cursor)
# queue up a task for the next page of results immediately
future_results = []
if more:
future_results = yield FetchEntitites(next_curs.urlsafe())
current_results = [ do some work on `results` ]
# (assumes current_results and future_results are both lists)
# this will have to wait for all of the recursive calls in
# future_results to resolve before it can resolve itself:
yield common.Extend(current_results, future_results)
Run Code Online (Sandbox Code Playgroud)
一开始我说我们可以把result = yield MyPipeline()它看作是回归'未来'.这不是严格意义上的,显然我们实际上只是产生了实例化的管道.(不用说我们的run方法现在是一个生成器函数.)
Python的yield表达式如何工作的奇怪部分是,尽管它看起来像是,你yield去函数之外的某个地方(到Pipeline API设备)而不是你的resultvar.通过调用生成器(生成器是您定义的方法)result,表达式左侧的var 值也从函数外部推入.sendrun
因此,通过生成实例化的Pipeline,您可以让Pipeline API获取该实例并run在其他时间将其方法调用到其他位置(实际上它将作为类名称和一组args和kwargs传递到任务队列中. - 在那里实例化......这就是你的args和kwargs也需要JSON序列化的原因).
同时,管道API sendSA PipelineFuture对象插入到run发电机,这是什么会出现在你的result变种.它似乎有点神奇和反直觉,但这就是带有yield表达式的生成器的工作原理.
对我来说,把它解决到这个水平是非常令人头疼的事情,我欢迎对我出错的任何澄清或更正.
| 归档时间: |
|
| 查看次数: |
2189 次 |
| 最近记录: |