Inf*_*oop 5 python multiprocessing
假设我们将一个数字拆分为不同的域:例如:100 拆分为:[0, 25] [25, 50] [50, 75] [75, 100]。然后,我们将这 4 个列表中的每一个发送到 4 个单独的进程之一进行计算,然后将答案重新组合为数字 100 的单个分割单元。我们根据进程的需要连续迭代此多次' 作为 1000 个数字的单位,分为类似于 [0, 25] [25, 50] [50, 75] [75, 100] 的单独域。如果我们必须关闭进程以使它们充当为得到答案而进行处理的单个组单元,则会出现效率问题。由于 Windows 与 Unix 相比运行进程很糟糕,我们被迫使用“spawn”方法而不是 fork。生成方法在生成过程中速度很慢,所以我想为什么不保持该过程“打开并将数据传入和传出它们,而不需要为并行过程的每个迭代组打开和关闭它们”。下面的示例代码将执行此操作。它将保持进程作为类消费者打开,类消费者将不断地使用 run() (在 while 循环中)请求带有 .get() 可连接队列的 next_task :
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill shutdown of .get() loop with break
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
for _ in range(num_jobs): # -so the above loop enqueues all jobs without-
result = results.get() # -waiting for the previous .put() to .get() first.
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
Run Code Online (Sandbox Code Playgroud)
这段代码只是一个例子。在此代码的其他变体中:当实现tasks.put()和results.get()的更多迭代时,需要一种方法来导致排队的Task(对象)在完全计算答案之前通过外部调用返回并自行返回。如果您已经从该单个拆分号码组的另一个进程之一获得了答案,这将释放资源。需要存在描述符__call__,Task(object) 才能作为调用tasks.put(Task(i, 100)) 的函数。过去两周我一直在尝试找到一种有效的方法来做到这一点。我需要采取完全不同的方法吗?不要误解我的困境,我正在使用可以工作的代码,但不如我在 Microsoft Windslows 上希望的那么高效。任何帮助将不胜感激。
Task(object) 是否与将其排队的 Consumer() 进程存在于同一进程中?如果是这样,我不能告诉 Class Consumer() Run() 的所有进程停止当前正在运行的 Task(object) 而不关闭它们的 while 循环(使用毒丸),以便它们可以立即接受另一个 Task() 而无需需要再次关闭并重新打开他们的进程吗?当您打开和关闭数千个进程以进行迭代计算时,它确实会增加并浪费时间。我尝试过使用 Events() Managers() 其他 Queues()。似乎没有一种有效的方法可以从外部干预任务(对象)以立即return到达其父消费者(),这样如果其他消费者()之一返回的答案使得它不会继续浪费资源计算其他 Consumer() 任务的计算无关紧要,因为它们都作为拆分成组的单个数字的统一计算来工作。
您所做的是实现了自己的多处理池,但为什么呢?concurrent.futures.ProcessPoolExecutor您是否不知道和类的存在multiprocessing.pool.Pool,后者实际上更适合您的特定问题?
这两个类都实现了多处理池以及用于将任务提交到池并从这些任务获取结果的各种方法。但是,由于在您的特定情况下,您提交的任务正在尝试解决相同的问题,并且您只对第一个可用结果感兴趣,一旦完成,您需要能够终止任何剩余的正在运行的任务。只multiprocessing.pool.Pool允许你这样做。
以下代码使用方法Pool.apply_async来提交任务。此函数不会阻塞,而是返回一个AsyncResult具有阻塞get方法的实例,您可以调用该方法来从提交的任务中获取结果。但由于通常您可能会提交许多任务,因此我们不知道要调用其中的哪一个实例get。因此,解决方案是改用指定函数callback的参数apply_async,只要任务可用,该函数就会使用任务的返回值进行异步调用。那么问题就变成了将这个结果传达回来。有两种方法:
方法一:通过全局变量
from multiprocessing import Pool
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(answer):
global solution
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
solution = answer
pool.terminate() # kill all tasks
if __name__ == '__main__':
t = time.time()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for all tasks to terminate:
pool.close()
pool.join()
print(solution)
print('Total elapsed time:', time.time() - t)
Run Code Online (Sandbox Code Playgroud)
印刷:
9
Total elapsed time: 1.1378364562988281
Run Code Online (Sandbox Code Playgroud)
方法2:通过队列
9
Total elapsed time: 1.1378364562988281
Run Code Online (Sandbox Code Playgroud)
印刷:
9
Total elapsed time: 1.1355643272399902
Run Code Online (Sandbox Code Playgroud)
更新
即使在 Windows 下,与完成任务所需的时间相比,创建和重新创建池的时间也可能相对微不足道,特别是对于以后的迭代,即较大的 值n。如果您调用相同的工作函数,那么第三种方法是使用 pool 方法imap_unordered。我还包含一些代码来测量我的桌面启动新池实例的开销:
from multiprocessing import Pool
from queue import Queue
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(solution):
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
q.put_nowait(solution)
if __name__ == '__main__':
t = time.time()
q = Queue()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for first returned result from callback:
solution = q.get()
print(solution)
pool.terminate() # kill all tasks in the pool
print('Total elapsed time:', time.time() - t)
Run Code Online (Sandbox Code Playgroud)
印刷:
Average pool creation time: 0.053139880299568176
Total elapsed time: 1.169790506362915
9
Run Code Online (Sandbox Code Playgroud)
更新2
困境如下:您有多个进程正在处理一个难题的各个部分。例如,一旦一个进程发现某个数字可以被传递范围内的数字之一整除,那么其他进程在完成其测试后测试不同范围内的数字就没有意义了。您可以做三件事之一。您可以什么都不做,让流程在开始下一次迭代之前完成。但这会延迟下一次迭代。我已经建议您终止进程,从而释放处理器。但这需要您创建新的流程,您发现这并不令人满意。
我只能想到另一种可能性,我在下面使用您的多处理方法介绍它。名为 的多处理共享内存变量stop在每个进程中作为全局变量进行初始化,并在每次迭代之前设置为 0。当一个任务设置为返回值 0 并且在其他进程中运行的其他任务没有继续下去的意义时,它会将 的值设置为stop1。这意味着任务必须定期检查 的值stop,如果已设置则返回到 1。当然,这会增加处理的额外周期。在下面的演示中,我实际上有 100 个任务在排队等待 8 个处理器。但最后 92 个任务将立即发现stop已设置并应在第一次迭代时返回。
顺便说一句:原始代码使用实例multiprocessing.JoinableQueue而不是 a 来对任务进行排队,multiprocessing.Queue并且task_done当消息从队列中取出时,会在该实例上进行调用。然而,从未对该队列进行任何调用join(这会告诉您所有消息何时已被取消),从而违背了拥有这样一个队列的全部目的。事实上,不需要 a ,JoinableQueue因为主进程已提交num_jobs作业并期待num_jobs结果队列上的消息,并且可以循环并从结果队列中提取预期数量的结果。我用一个简单的方法代替了Queue保留JoinableQueue原始代码但注释掉了。此外,Consumer进程可以创建为守护进程(带有参数daemon=True),然后当所有非守护进程(即主进程)终止时,它们将自动终止,从而避免使用特殊的“毒丸”None任务消息。我已经进行了更改,并再次保持原始代码不变,但注释掉以进行比较。
9
Total elapsed time: 1.1355643272399902
Run Code Online (Sandbox Code Playgroud)
印刷:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
True
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2592 次 |
| 最近记录: |