我知道multiprocessing.Manager()
它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.
但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key
.
当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing
进口为mp
:
for key in all_keys: DICT[key] = mp.Queue
在多处理模块导入的配置文件中定义一个dict (调用它multi.py
)不会返回错误,但是队列DICT[key]
之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.
如果我尝试在定义DICT
进程并启动它们的主多处理函数的开头定义,比如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)
我收到了错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
Run Code Online (Sandbox Code Playgroud)
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)
只会让事情变得更糟.multi.py
在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?
编辑
这是该程序的基本架构:
1-加载第一个模块,它定义一些变量,导入multi
,启动multi.main()
和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...
2- …
我有一个下载器功能,可以并行下载多个文件.我用multiprocessing.Pool.map_async
它来下载同一文件的不同块.我想显示下载的状态栏.为此,我需要知道已经下载的总字节数(total_bytes_dl
).
pool = multiprocessing.Pool(processes)
mapObj = pool.map_async(f, args)
while not mapObj.ready():
status = r"%.2f MB / %.2f MB" % (total_bytes_dl / 1024.0 / 1024.0, filesize / 1024.0 / 1024.0,)
status = status + chr(8)*(len(status)+1)
print status,
time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)
有没有办法设置一个将在所有这些进程和主进程之间共享的变量,因此每个进程都可以附加刚刚下载的字节数量?
我正在使用 ProcessPoolExecutor 上下文管理器并行运行多个 Kafka 使用者。我需要存储子进程的进程 ID,以便稍后我可以干净地终止这些进程。我有这样的代码:
Class MultiProcessConsumer:
...
def run_in_parallel(self):
parallelism_factor = 5
with ProcessPoolExecutor() as executor:
processes = [executor.submit(self.consume) for _ in range(parallelism_factor)]
# It would be nice If I could write [process.pid for process in processes] to a file here.
def consume(self):
while True:
for message in self.kafka_consumer:
do_stuff(message)
Run Code Online (Sandbox Code Playgroud)
os.get_pid()
我知道我可以在 Consumer 方法中使用来获取 PID。但是,正确处理它们(在消费者不断关闭或启动的情况下)需要一些额外的工作。
您建议我如何在这样的上下文中获取并存储子进程的 PID?
我想在多个进程之间共享numpy数组.有工作的解决方案在这里.但是它们都通过继承将数组传递给子进程,这对我来说不起作用,因为我必须事先启动一些工作进程,而且我不知道稍后我要处理多少个数组.有没有办法在进程启动后创建这样的数组并通过队列将这些数组传递给进程?
顺便说一下,我无法使用multiprocessing.Manager
.
相关问题出现在Why I can't use multiprocessing.Queue with ProcessPoolExecutor? 。我提供了部分答案以及解决方法,但承认这个问题引发了另一个问题,即为什么可以multiprocessing.Queue
将实例作为参数传递给辅助函数。multiprocessing.Process
例如,以下代码在使用spawn或fork方法创建新进程的平台下会失败:
from multiprocessing import Pool, Queue
def worker(q):
print(q.get())
with Pool(1) as pool:
q = Queue()
q.put(7)
pool.apply(worker, args=(q,))
Run Code Online (Sandbox Code Playgroud)
上述提出:
RuntimeError: Queue objects should only be shared between processes through inheritance
但下面的程序运行没有问题:
from multiprocessing import Process, Queue
def worker(q):
print(q.get())
q = Queue()
q.put(7)
p = Process(target=worker, args=(q,))
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud)
看来,多处理池工作函数的参数最终会放在池的输入队列上,该队列是作为 a 实现的,multiprocessing.SimpleQueue
并且您不能将实例放入使用 a进行序列化的实例中。multiprocessing.Queue
multiprocessing.SimpleQueue
ForkingPickler
那么,multiprocessing.Queue
当作为参数传递给 a …
我有两个 python 类,它们共享一个基类,假设定义了一个 multiprocessing.Queue。
这些类中的每一个都将在单独的子进程上启动,并且需要通过队列共享信息。
现在,如果基类定义了一个队列,那么每个子类对象将实例化它自己的队列,从而无法共享队列元素。但话又说回来,我发现这记录
Queue objects should only be shared between processes through inheritance.
Run Code Online (Sandbox Code Playgroud)
那么在子进程之间共享队列的正确方法是什么,上面的句子甚至如何有意义?
请注意,我显然可以在子进程初始化时传递对队列的引用,但我想使用继承来解决这个问题
从线程到进程,我已切换到并发。futures,并且希望获得/保留在各种情况下在 aThreadPoolExecutor
和 a之间切换的灵活性ProcessPoolExecutor
。然而,尽管承诺有一个统一的外观,但当我切换到使用时,我很难将多处理Queue
对象作为参数传递:futures.submit()
ProcessPoolExecutor
import multiprocessing as mp\nimport concurrent.futures\n\ndef foo(q):\n q.put('hello')\n\nif __name__ == '__main__':\n\n executor = concurrent.futures.ProcessPoolExecutor()\n q = mp.Queue()\n p = executor.submit(foo, q)\n p.result()\n print(q.get())\n
Run Code Online (Sandbox Code Playgroud)\n遇到来自多处理代码的以下异常:
\n\n\nRuntimeError:队列对象只能通过继承在进程之间共享
\n
我认为这意味着它不喜欢接收队列作为参数,而是期望(不是在任何 OOP 意义上)在多处理分支上“继承它”,而不是将其作为参数获取。
\n不同之处在于,对于简单的多处理,这意味着当不通过并发.futures 为 \xe2\x80\x95 的外观使用它时,似乎没有这样的限制,因为以下代码可以无缝工作:
\nimport multiprocessing as mp\n\ndef foo(q):\n q.put('hello')\n\nif __name__ == '__main__':\n q = mp.Queue()\n p = mp.Process(target=foo, args=(q,))\n p.start()\n p.join()\n print(q.get())\n
Run Code Online (Sandbox Code Playgroud)\n我想知道我在这个 \xe2\x80\x95 中缺少什么,如何ProcessPoolExecutor
在使用并发.futures 时接受队列作为参数,就像使用 or ThreadPoolExecutor
multiprocessing 时一样直接如上所示? …