Bry*_*yce 6 python python-3.x python-multiprocessing
当我运行以下代码块时:
global manager
global lock
manager = Manager()
lock = manager.Lock()
class MyClass(object):
def get_next_chunk(self, numberlist, chunks):
for i in range(0, len(numberlist), chunks):
yield numberlist[i:i + chunks]
def multi_process(self, numberlist):
procs = 5
chunksize = 100
with Pool(procs) as pool:
pool.map(self.process_numberlist,
self.get_next_chunk(numberlist, chunksize))
return self.running_total_list
def process_numberlist(self, numberlist):
temp_num_list = []
temp_num_list = self.getnewNumbers()
logger.debug("temp_num_list length: " + str(len(temp_num_list)))
try:
lock.acquire()
except Exception as e:
logger.error("Couldn't acquire lock")
logger.error(e)
traceback.format_exc()
logger.error(sys.exc_info()[0])
self.running_total_list = self.running_total_list + temp
logger.debug("New running_total_list length: "
+ str(len(self.running_total_list)))
lock.release()
break
Run Code Online (Sandbox Code Playgroud)
我的日志中的输出如下所示:
[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9
Run Code Online (Sandbox Code Playgroud)
当我达到预期的输出时,我相信应该像这样:
[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15
Run Code Online (Sandbox Code Playgroud)
编辑-尝试2
请参阅根据亚伦的建议进行的更新。现在收到“只能加入可迭代”错误
global manager
global lock
class MyClass(object):
def get_next_chunk(self, numberlist, chunks):
for i in range(0, len(numberlist), chunks):
yield numberlist[i:i + chunks]
def multi_process(self, numberlist):
procs = 5
chunksize = 100
manager = Manager()
lock = manager.Lock()
with Pool(procs) as pool:
func = partial(self.process_numberlist, lock)
pool.map(function,
self.get_next_chunk(numberlist, chunksize))
return self.running_total_list
def process_numberlist(self, numberlist, lock):
temp_num_list = []
temp_num_list = self.getnewNumbers()
logger.debug("temp_num_list length: " + str(len(temp_num_list)))
try:
lock.acquire()
self.running_total_list = self.running_total_list + temp_num_list
logger.debug("New running_total_list length: "
+ str(len(self.running_total_list)))
lock.release()
except Exception as e:
logger.error("Couldn't acquire lock")
logger.error(e)
traceback.format_exc()
logger.error(sys.exc_info()[0])
break
Run Code Online (Sandbox Code Playgroud)
编辑#3-此玩具示例中未包含的getNewNumbers()仅返回整数数组。希望能有所帮助
在我看来,您的主要目标是访问共享资源(running_total_list
),这就是为什么我特别关注这一点。
在你的例子中你使用了Pool
,而我使用了Process
。您可以查看这篇文章,了解两者之间的核心差异,并决定哪一个更适合您的用例。
我想出了这个关于如何在多个进程之间共享资源的快速示例。这应该可以让您了解如何从那里继续:
from multiprocessing import Process, Lock, Manager
def gen_numbers():
import random
return [i for i in range(random.randint(4,11))]
def process_numberlist(lock, shared_list, num):
temp_num_list = gen_numbers()
print("Proc %s: temp_num_list length: %s" %(num, len(temp_num_list)))
try:
lock.acquire()
shared_list += temp_num_list
print("Proc %s: New shared_list length: %s" %(num, len(shared_list)))
finally:
lock.release()
lock = Lock()
manager = Manager()
shared_list = manager.list()
proc = 5
proc_list = []
for num in range(proc):
p = Process(target=process_numberlist, args=( lock, shared_list, num+1, ))
p.start()
proc_list.append( p )
for p in proc_list:
p.join()
Run Code Online (Sandbox Code Playgroud)
需要注意的一件重要事情是此处 a 的定义shared_list
。与线程不同,每个进程都有自己的内存空间(Pool
也不例外),这就是为什么它们之间无法共享数据的原因。这意味着,您需要实现某种进程间通信(IPC),幸运的是 python 已经为您提供了一些工具。其中之一是multiprocessing.Manager
。它公开了一些数据结构(例如dict
或list
),您可以使用它们在进程之间共享。
在这种情况下也是如此Lock
。这很重要,因为您不想同时从多个进程访问共享内存。这只会让你的程序变得不可预测。
另一件需要注意的事情是,执行不一定按顺序process_numberlist
发生,因为每个进程彼此独立运行,但它们都可以访问相同的资源。
我希望这有帮助!
归档时间: |
|
查看次数: |
200 次 |
最近记录: |