ilp*_*omo 0 python multiprocessing
我有 4 个不同的 Python 自定义对象和一个事件队列。每个 obect 都有一个方法,允许它从共享事件队列中检索事件,如果类型是所需的,则处理它,然后将新事件放在同一个事件队列中,允许其他进程处理它。
这是一个例子。
import multiprocessing as mp
class CustomObject:
def __init__(events_queue: mp.Queue) -> None:
self.events_queue = event_queue
def process_events_queue() -> None:
event = self.events_queue.get()
if type(event) == SpecificEventDataTypeForThisClass:
# do something and create a new_event
self.events_queue.put(new_event)
else:
self.events_queue.put(event)
# there are other methods specific to each object
Run Code Online (Sandbox Code Playgroud)
这 4 个对象有特定的任务要做,但它们都共享相同的结构。由于我需要“模拟”生产条件,我希望它们同时运行,彼此独立。
如果可能的话,这只是我想做的一个例子。
import multiprocessing as mp
import CustomObject
if __name__ == '__main__':
events_queue = mp.Queue()
data_provider = mp.Process(target=CustomObject, args=(events_queue,))
portfolio = mp.Process(target=CustomObject, args=(events_queue,))
engine = mp.Process(target=CustomObject, args=(events_queue,))
broker = mp.Process(target=CustomObject, args=(events_queue,))
while True:
data_provider.process_events_queue()
portfolio.process_events_queue()
engine.process_events_queue()
broker.process_events_queue()
Run Code Online (Sandbox Code Playgroud)
我的想法是在单独的进程中运行每个对象,允许它们与通过 events_queue 共享的事件进行通信。所以我的问题是,我该怎么做?
问题是obj = mp.Process(target=CustomObject, args=(events_queue,))返回一个 Process 实例,我无法从中访问 CustomObject 方法。另外,有没有更聪明的方法来实现我想要的?
进程需要一个函数来运行,它定义了进程实际在做什么。一旦这个函数退出(并且没有非守护线程),这个过程就完成了。这类似于 Python 本身总是执行__main__脚本的方式。
如果你这样做mp.Process(target=CustomObject, args=(events_queue,))只是告诉进程调用CustomObject- 它实例化它一次然后完成。这不是您想要的,除非该类在实例化时实际执行工作 - 由于其他原因,这是一个坏主意。
相反,您必须定义一个处理您需要的主要函数或方法:“与通过 events_queue 共享的事件通信”。此函数应侦听队列并根据收到的事件采取行动。
一个简单的实现如下所示:
import os, time
from multiprocessing import Queue, Process
class Worker:
# separate input and output for simplicity
def __init__(self, commands: Queue, results: Queue):
self.commands = commands
self.results = results
# our main function to be run by a process
def main(self):
# each process should handle more than one command
while True:
value = self.commands.get()
# pick a well-defined signal to detect "no more work"
if value is None:
self.results.put(None)
break
# do whatever needs doing
result = self.do_stuff(value)
print(os.getpid(), ':', self, 'got', value, 'put', result)
time.sleep(0.2) # pretend we do something
# pass on more work if required
self.results.put(result)
# placeholder for what needs doing
def do_stuff(self, value):
raise NotImplementedError
Run Code Online (Sandbox Code Playgroud)
这是一个持续处理事件的类的模板。该do_stuff方法必须重载以定义实际发生的情况。
class AddTwo(Worker):
def do_stuff(self, value):
return value + 2
class TimesThree(Worker):
def do_stuff(self, value):
return value * 3
class Printer(Worker):
def do_stuff(self, value):
print(value)
Run Code Online (Sandbox Code Playgroud)
这已经定义了完全工作的进程有效负载:在进程中Process(target=TimesThree(in_queue, out_queue).main)调度main方法,侦听和响应命令。
运行这个主要需要连接各个组件:
if __name__ == '__main__':
# bookkeeping of resources we create
processes = []
start_queue = Queue()
# connect our workers via queues
queue = start_queue
for element in (AddTwo, TimesThree, Printer):
instance = element(queue, Queue())
# we run the main method in processes
processes.append(Process(target=instance.main))
queue = instance.results
# start all processes
for process in processes:
process.start()
# send input, but do not wait for output
start_queue.put(1)
start_queue.put(248124)
start_queue.put(-256)
# send shutdown signal
start_queue.put(None)
# wait for processes to shutdown
for process in processes:
process.join()
Run Code Online (Sandbox Code Playgroud)
请注意,您不需要为此提供课程。你也可以为类似的效果组合函数,只要一切都可以泡菜:
import os, time
from multiprocessing import Queue, Process
def main(commands, results, do_stuff):
while True:
value = commands.get()
if value is None:
results.put(None)
break
result = do_stuff(value)
print(os.getpid(), ':', do_stuff, 'got', value, 'put', result)
time.sleep(0.2)
results.put(result)
def times_two(value):
return value * 2
if __name__ == '__main__':
in_queue, out_queue = Queue(), Queue()
worker = Process(target=main, args=(in_queue, out_queue, times_two))
worker.start()
for message in (1, 3, 5, None):
in_queue.put(message)
while True:
reply = out_queue.get()
if reply is None:
break
print('result:', reply)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1229 次 |
| 最近记录: |