该multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process.开头的进程.但是,如何与异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
这失败了:
RuntimeError: Queue objects should only be shared between processes through inheritance.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何做我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?
python queue parallel-processing multiprocessing python-multiprocessing
我正在multiprocessing.Pool对象中使用该类并尝试执行以下操作:
from multiprocessing import Lock, Pool
class A:
def __init__(self):
self.lock = Lock()
self.file = open('test.txt')
def function(self, i):
self.lock.acquire()
line = self.file.readline()
self.lock.release()
return line
def anotherfunction(self):
pool = Pool()
results = pool.map(self.function, range(10000))
pool.close()
pool.join()
return results
Run Code Online (Sandbox Code Playgroud)
但是,我收到一个运行时错误,指出锁对象只能通过继承在进程之间共享。我对 Python 和多处理相当陌生。我怎样才能走上正确的轨道?
当我运行以下代码时:
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue
q = Queue()
def my_task(x, queue):
queue.put("Task Complete")
return x
with ProcessPoolExecutor() as executor:
tasks = [executor.submit(my_task, i, q) for i in range(10)]
for task in as_completed(tasks):
print(task.result())
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.10/multiprocessing/queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "/usr/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
raise RuntimeError(
RuntimeError: Queue …Run Code Online (Sandbox Code Playgroud)