Xua*_*uan 2 python concurrency tornado multiprocessing concurrent.futures
我正在尝试运行多个进程,同时使用它concurrent.futures.ProcessPoolExecutor来运行CPU密集型作业.前几个请求很愉快,但随后KeyError提出了一个请求concurrent.futures.process,服务器挂起.
这是龙卷风中的一个错误吗?
这是我删除代码的最简单形式.
服务器:
# -*- coding: utf-8 -*-
"""
server runs 2 processes and does job on a ProcessPoolExecutor
"""
import tornado.web
import tornado.ioloop
import tornado.gen
import tornado.options
import tornado.httpserver
from concurrent.futures import ProcessPoolExecutor
class MainHandler(tornado.web.RequestHandler):
executor = ProcessPoolExecutor(1)
@tornado.gen.coroutine
def post(self):
num = int(self.request.body)
result = yield self.executor.submit(pow, num, 2)
self.finish(str(result))
application = tornado.web.Application([
(r"/", MainHandler),
])
def main():
tornado.options.parse_command_line()
server = tornado.httpserver.HTTPServer(application)
server.bind(8888)
server.start(2)
tornado.ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
客户:
# -*- coding: utf-8 -*-
"""
client
"""
from tornado.httpclient import AsyncHTTPClient
from tornado.gen import coroutine
from tornado.ioloop import IOLoop
@coroutine
def remote_compute(num):
rsp = yield AsyncHTTPClient().fetch(
'http://127.0.0.1:8888', method='POST', body=str(num))
print 'result:', rsp.body
IOLoop.instance().run_sync(lambda: remote_compute(10))
Run Code Online (Sandbox Code Playgroud)
错误追溯
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/Users/cliffxuan/.virtualenvs/executor/lib/python2.7/site-packages/concurrent/futures/process.py", line 216, in _queue_management_worker
work_item = pending_work_items[result_item.work_id]
KeyError: 0
Run Code Online (Sandbox Code Playgroud)
这与之间的相互作用tornado和concurrent.futures当您启动tornado多个进程的服务器,使用server.start(2).这内部用于os.fork()创建两个进程.因为您将该Executor变量声明为类变量,所以MainHandler在server.start()实际运行之前,在实现类本身时会对其进行实例化.这意味着两个进程最终共享一个(尽管是分叉的)ProcessPoolExecutor实例.这导致了一些奇怪的问题 - 每个进程都获得了大多数数据结构的写时复制版本Executor,但最终实际上共享了相同的工作进程.
ProcessPoolExecutor不支持在这样的进程之间共享,因此在第二个进程尝试使用时会出现问题Executor.你可以解决它通过只在创建Executor 后的fork发生:
class MainHandler(tornado.web.RequestHandler):
executor = None # None for now
@tornado.gen.coroutine
def post(self):
num = int(self.request.body)
result = yield self.executor.submit(pow, num, 2)
self.finish(str(result))
application = tornado.web.Application([
(r"/", MainHandler),
])
def main():
tornado.options.parse_command_line()
server = tornado.httpserver.HTTPServer(application)
server.bind(8889)
server.start(2) # We fork here
MainHandler.executor = ProcessPoolExecutor(1) # Now we can create the Executor
tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2212 次 |
| 最近记录: |