wks*_*rtz 12 python parallel-processing race-condition python-3.x
我试图找到大约61亿(自定义)项目的最大重量,我想用并行处理这样做.对于我的特定应用程序,有更好的算法,不需要我迭代超过61亿项,但解释它们的教科书是我的头脑,我的老板希望在4天内完成.我想我的公司的花哨的服务器和并行处理有更好的机会.但是,我所知道的关于并行处理的一切都来自于阅读Python 文档.这就是说我很丢失......
我目前的理论是设置一个馈送器进程,一个输入队列,一大堆(比如说30个)工作进程,以及一个输出队列(在输出队列中找到最大元素将是微不足道的).我不明白的是,馈线进程如何告诉工作进程何时停止等待项目通过输入队列.
我曾经考虑过使用multiprocessing.Pool.map_async
我的6.1E9项目的迭代,但是只需要花费将近10分钟来迭代这些项目而不对它们做任何事情.除非我误解了某些东西......在map_async
迭代过程中将它们分配给流程可以在流程开始工作时完成.(Pool
也提供imap
但是文档说它类似于map
,它似乎不是异步工作.我想要异步,对吗?)
相关问题:我想用concurrent.futures
而不是multiprocessing
吗?我不可能是第一个实施双排队系统的人(这正是美国每家熟食店的生产线如何工作......)那么有更多的Pythonic /内置方法吗?
这是我正在尝试做的一个框架.请参阅中间的注释块.
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
Run Code Online (Sandbox Code Playgroud)
我找到了一个非常彻底的回答我的问题,以及Python基金会通讯主管Doug Hellman对多任务处理的简单介绍.我想要的是"毒丸"模式.请在此处查看:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
转发@MRAB发布该概念的内核.
归档时间: |
|
查看次数: |
2599 次 |
最近记录: |