Nis*_*n.H 4 python queue multiprocessing
我正在尝试在管理进程下启动数据队列服务器(以便以后可以将其转换为服务),并且虽然数据队列服务器功能在主进程中正常工作,但它在使用创建的进程中不起作用multiprocessing.Process.
dataQueueServer和dataQueueClient代码基于此处多处理模块文档中的代码.
当自己运行时,dataQueueServer运行良好.然而,在使用中运行时multiprocessing.Process的start()在mpquueue,它不工作(当与客户端进行测试).我正在使用dataQueueClient而没有更改来测试这两种情况.
代码确实达到了serve_forever两种情况,所以我认为服务器正在运行,但有些东西阻止它在mpqueue情况下与客户端进行通信.
我已经serve_forever()在线程下放置了运行该部件的循环,因此它可以停止.
这是代码:
mpqueue#这是尝试在子进程中生成服务器的"管理器"进程
import time
import multiprocessing
import threading
import dataQueueServer
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueServer(multiprocessing.Process):
def __init__(self, name = '', printer = None):
multiprocessing.Process.__init__(self)
self.name = name
self.printer = printer
self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)
def run(self):
self.printer.tsprint(self.ml)
self.ml.start()
def stop(self):
self.ml.stop()
if __name__ == '__main__':
printer = Printer()
qs = QueueServer(name = 'QueueServer', printer = printer)
printer.tsprint(qs)
printer.tsprint('starting')
qs.start()
printer.tsprint('started.')
printer.tsprint('Press Ctrl-C to quit')
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
printer.tsprint('\nTrying to exit cleanly...')
qs.stop()
printer.tsprint('stopped')
Run Code Online (Sandbox Code Playgroud)
dataQueueServer
import time
import threading
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
## Define some helper functions for use by the main process loop
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueManager(BaseManager):
pass
class MainLoop(threading.Thread):
"""A thread based loop manager, allowing termination signals to be sent
to the thread"""
def __init__(self, name = '', printer = None):
threading.Thread.__init__(self)
self._stopEvent = threading.Event()
self.daemon = True
self.name = name
if printer is None:
self.printer = Printer()
else:
self.printer = printer
## create the queue
self.queue = Queue()
## Add a function to the handler to return the queue to clients
self.QM = QueueManager
self.QM.register('get_queue', callable=lambda:self.queue)
self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
self.queue_server = self.queue_manager.get_server()
def __del__(self):
self.printer.tsprint( 'closing...')
def run(self):
self.printer.tsprint( '{}: started serving'.format(self.name))
self.queue_server.serve_forever()
def stop(self):
self.printer.tsprint ('{}: stopping'.format(self.name))
self._stopEvent.set()
def stopped(self):
return self._stopEvent.isSet()
def start():
printer = Printer()
ml = MainLoop(name = 'ml', printer = printer)
ml.start()
return ml
def stop(ml):
ml.stop()
if __name__ == '__main__':
ml = start()
raw_input("\nhit return to stop")
stop(ml)
Run Code Online (Sandbox Code Playgroud)
和客户:
dataQueueClient
import datetime
from multiprocessing.managers import BaseManager
n = 0
N = 10**n
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
def now():
return datetime.datetime.now()
def gen(n, func, *args, **kwargs):
k = 0
while k < n:
yield func(*args, **kwargs)
k += 1
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()
def load(msg, q):
return q.put(msg)
def get(q):
return q.get()
lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
try:
lgen.next()
except StopIteration:
break
t1 = now()
print 'loaded %d items in ' % N, t1-t0
t0 = now()
while queue.qsize() > 0:
queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0
Run Code Online (Sandbox Code Playgroud)
所以似乎解决方案很简单:不要使用serve_forever(),manager.start()而是使用.
根据Eli Bendersky的说法,BaseManager(它的扩展版本SyncManager)已经在一个新的过程中生成了服务器(并且查看了多处理.管理器代码确认了这一点).我遇到的问题源于示例中使用的表单,其中服务器在主进程下启动.
我仍然不明白为什么当在子进程下运行时当前示例不起作用,但这不再是问题.
以下是管理多个队列服务器的工作(以及简化的OP)代码:
服务器:
from multiprocessing import Queue
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
name0 = 'qm0'
name1 = 'qm1'
name2 = 'qm2'
description = 'Queue Server'
def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None):
name = name
description = description
q = Queue()
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue', callable = lambda: q)
QueueManager.register('get_name', callable = name)
QueueManager.register('get_description', callable = description)
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.start() # This actually starts the server
return manager
# Start three queue servers
qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)
qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description)
qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description)
raw_input("return to end")
Run Code Online (Sandbox Code Playgroud)
客户:
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
def QueueServerClient(HOST, PORT, AUTHKEY):
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue')
QueueManager.register('get_name')
QueueManager.register('get_description')
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.connect() # This starts the connected client
return manager
# create three connected managers
qc0 = QueueServerClient(HOST, PORT0, AUTHKEY)
qc1 = QueueServerClient(HOST, PORT1, AUTHKEY)
qc2 = QueueServerClient(HOST, PORT2, AUTHKEY)
# Get the queue objects from the clients
q0 = qc0.get_queue()
q1 = qc1.get_queue()
q2 = qc2.get_queue()
# put stuff in the queues
q0.put('some stuff')
q1.put('other stuff')
q2.put({1:123, 2:'abc'})
# check their sizes
print 'q0 size', q0.qsize()
print 'q1 size', q1.qsize()
print 'q2 size', q2.qsize()
# pull some stuff and print it
print q0.get()
print q1.get()
print q2.get()
Run Code Online (Sandbox Code Playgroud)
添加额外的服务器以使用正在运行的队列服务器的信息共享字典,以便消费者可以使用该模型轻松地告诉可用的内容.但有一点需要注意的是,共享字典需要的语法与普通字典略有不同:dictionary[0] = something不起作用.您需要使用dictionary.update([(key, value), (otherkey, othervalue)])和dictionary.get(key)语法,它传播到连接到此字典的所有其他客户端.
| 归档时间: |
|
| 查看次数: |
4710 次 |
| 最近记录: |