Ada*_*dam 5 multiprocessing python-3.x
我有一种需要处理大型数据库的方法,需要花费数小时/天的时间来挖掘参数存储在一个(长)列表中,其中应在一批中处理 max X 。该方法不需要返回任何内容,但我返回“True”表示“有趣”......
当我线性迭代它时(生成/附加此处未看到的其他表中的结果),该函数工作正常,但我无法获得 apply_async 或 map_async 工作。(它之前在其他项目中有效)任何关于我可能做错了什么的提示将不胜感激,提前致谢!参见下面的代码:
import multiprocessing as mp
class mainClass:
#loads of stuff
def main():
multiprocess = True
batchSize = 35
mC = mainClass()
while True:
toCheck = [key for key, value in mC.lCheckSet.items()] #the tasks are stored in a dictionary, I'm referring to them with their keys, which I turn to a list here for iteration.
if multiprocess == False:
#this version works perfectly fine
for i in toCheck[:batchSize]:
mC.check(i)
else:
#the async version does not, either with apply_async...
with mp.Pool(processes = 8) as pool:
temp = [pool.apply_async(mC.check, args=(toCheck[n],)) for n in range(len(toCheck[:batchSize]))]
results = [t.get() for t in temp]
#...or as map_async
pool = mp.Pool(processes = 8)
temp = pool.map_async(mC.check, toCheck[:batchSize])
pool.close()
pool.join()
if __name__=="__main__":
main()
Run Code Online (Sandbox Code Playgroud)
这里的“味道”是,您在主进程上实例化 maincClass 一次,然后尝试在不同进程上调用它的方法 - 但请注意,当您传递到mC.check进程池时,它已经是一个方法绑定到在此过程中实例化的类。
我猜你的问题就出在这里。尽管这可能有效 - 并且确实有效 - 我制作了这个简化版本并且它按预期工作:
import multiprocessing as mp
import random, time
class MainClass:
def __init__(self):
self.value = 1
def check(self, arg):
time.sleep(random.uniform(0.01, 0.3))
print(id(self),self.value, arg)
def main():
mc = MainClass()
with mp.Pool(processes = 4) as pool:
temp = [pool.apply_async(mc.check, (i,)) for i in range(8)]
results = [t.get() for t in temp]
main()
Run Code Online (Sandbox Code Playgroud)
(您是否尝试过添加一些prints 以确保该方法根本没有运行?)因此,问题可能出在 MainClass 中的某些复杂状态中,而这些状态无法以良好的方式进入并行进程。一种可能的解决方法是在每个进程内实例化您的主类 - 这可以轻松完成,因为 MultiProcessing 允许您获取current_process,并使用该对象作为命名空间,以在不同的调用中将数据保存在工作池中实例化的进程中应用异步。
因此,创建一个check如下所示的新函数 - 并且不要在主进程中实例化主类,而是在池中的每个进程中实例化它:
import multiprocessing as mp
import random, time
def check(arg):
process = mp.current_process
if not hasattr(process, "main_class"):
process.main_class = MainClass()
process.main_class.check(arg)
class MainClass:
def __init__(self):
self.value = random.randrange(100)
def check(self, arg):
time.sleep(random.uniform(0.01, 0.3))
print(id(self),self.value, arg)
def main():
mc = MainClass()
with mp.Pool(processes = 2) as pool:
temp = [pool.apply_async(check, (i,)) for i in range(8)]
results = [t.get() for t in temp]
main()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7386 次 |
| 最近记录: |