aga*_*and 11 python asynchronous twisted
Twisted(对于python)的强度是它的异步框架(我认为).我写了一个图像处理服务器,通过Perspective Broker接收请求.只要我一次喂它少于几百张图像,它就能很好地工作.然而,有时它几乎同时会被数百张图像加入.因为它尝试同时处理它们所有服务器崩溃.
作为一种解决方案,我想在服务器上排队remote_calls,这样它一次只能处理大约100张图像.看起来这可能是Twisted已经做过的事情,但我似乎无法找到它.关于如何开始实施这个的任何想法?正确方向的一点?谢谢!
Jea*_*one 29
一个可能有用的现成选项是twisted.internet.defer.DeferredSemaphore.这是你可能已经知道的正常(计数)信号量的异步版本,如果你做了很多线程编程的话.
(计数)信号量很像互斥锁(锁定).但是,只有在相应的版本中才能获取互斥锁一次,可以配置(计数)信号量以允许任意(但指定)数量的采集在需要任何相应的版本之前成功.
这是一个使用DeferredSemaphore运行十个异步操作的示例,但最多可以同时运行三个异步操作:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
DeferredSemaphore也有明确acquire和release方法,但该run方法是如此方便,它几乎总是你想要什么.它调用acquire方法,返回一个Deferred.首先Deferred,它添加一个调用您传入的函数的回调(以及任何位置或关键字参数).如果该函数返回a Deferred,那么Deferred到那一秒会添加一个调用该release方法的回调.
通过立即调用也可以处理同步情况release.错误也通过允许它们传播来处理,但确保完成必要release的操作以使其DeferredSemaphore处于一致状态.传递给函数的结果run(或的结果Deferred返回)变的结果Deferred通过返回run.
另一种可能的方法可能是基于DeferredQueue和cooperate. DeferredQueue大部分都像普通队列,但它的get方法返回一个Deferred.如果在呼叫时队列中没有项目,则在Deferred添加项目之前不会触发.
这是一个例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
请注意,asyncworker函数与第一个示例中的函数相同.但是,这一次,还有一个worker函数,它明确地将作业拉出DeferredQueue并处理它们async(通过添加async作为回调函数的Deferred返回get).所述worker发电机被驱动通过cooperate,其中每个后迭代一次Deferred它产生火灾.然后,主循环启动其中三个工作生成器,以便在任何给定时间执行三个作业.
这种方法涉及的代码比DeferredSemaphore方法多一些,但有一些可能有趣的好处.首先,cooperate返回一个CooperativeTask具有有用的方法类似的例子pause,resume和几个人.此外,分配给同一合作者的所有作业将在调度中相互合作,以免过载事件循环(这就是API的名称).另一方面DeferredQueue,也可以对待处理的项目数量进行限制,这样可以避免服务器完全过载(例如,如果图像处理器卡住并停止完成任务).如果代码调用put处理队列溢出异常,您可以使用此作为压力来尝试停止接受新作业(可能将它们分流到另一台服务器,或警告管理员).做类似的事情DeferredSemaphore有点棘手,因为没有办法限制等待能够获取信号量的作业数量.
| 归档时间: |
|
| 查看次数: |
5945 次 |
| 最近记录: |