我在python中编写了一个多处理程序.我用来multiprocessing.Manager().list()在子进程内共享列表.首先,我在主要过程中添加一些任务.然后,启动一些子进程来执行在共享列表中执行的任务,子进程还将任务添加到共享列表.但我有一个例外,如下:
Traceback (most recent call last):
File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/usr/lib64/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "gen_friendship.py", line 255, in worker
if tmpu in nodes:
File "<string>", line 2, in __contains__
File "/usr/lib64/python2.6/multiprocessing/managers.py", line 722, in _callmethod
self._connect()
File "/usr/lib64/python2.6/multiprocessing/managers.py", line 709, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib64/python2.6/multiprocessing/connection.py", line 143, in Client
c = SocketClient(address)
File "/usr/lib64/python2.6/multiprocessing/connection.py", line 263, in SocketClient
s.connect(address)
File "<string>", line 1, in connect
error: [Errno …Run Code Online (Sandbox Code Playgroud) 我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.
但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.
当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:
for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.
如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)
我收到了错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
Run Code Online (Sandbox Code Playgroud)
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)
只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?
编辑
这是该程序的基本架构:
1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...
2- …
如何使用Python的多处理将字典传递给函数?文档:https://docs.python.org/3.4/library/multiprocessing.html#reference说传递字典,但我一直在
TypeError: fp() got multiple values for argument 'what'
Run Code Online (Sandbox Code Playgroud)
这是代码:
from multiprocessing import Pool, Process, Manager
def fp(name, numList=None, what='no'):
print ('hello %s %s'% (name, what))
numList.append(name+'44')
if __name__ == '__main__':
manager = Manager()
numList = manager.list()
for i in range(10):
keywords = {'what':'yes'}
p = Process(target=fp, args=('bob'+str(i)), kwargs={'what':'yes'})
p.start()
print("Start done")
p.join()
print("Join done")
print (numList)
Run Code Online (Sandbox Code Playgroud) 我有一个python pandas数据帧的字典.这本词典的总大小约为2GB.但是,当我在16个多处理中共享它时(在子进程中我只读取dict的数据而不修改它),它需要32GB ram.所以我想问一下,如果我可以在不进行复制的情况下跨多处理共享这个字典.我试图将它转换为manager.dict().但似乎需要太长时间.实现这一目标的最标准方法是什么?谢谢.
因此,假设您有一个 Python 进程,它正在以每秒大约 500 行的速度实时收集来自排队系统的数据(这可以进一步并行化以减少到大约 50 ps)并将其附加到DataFrame:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
Run Code Online (Sandbox Code Playgroud)
现在的问题是:如何根据这些数据利用 CPU?所以我充分认识到的局限性GIL,看着成多处理管理器 的命名空间,这里太,但它看起来像有一些缺点关于延迟的centerally保持数据帧。在深入研究之前,我还尝试pool.map了我认为pickle在进程之间应用的方法,这种方法很慢并且开销太大。
所以在所有这一切之后,我终于想知道,如何(如果)每秒 500 行(甚至每秒 50 行)的插入可以传输到不同的进程,同时还有一些 CPU 时间用于对子项中的数据应用统计和启发式流程?
也许在两个进程之间实现自定义 tcp 套接字或排队系统会更好?或者在pandas其他库中是否有一些实现可以真正允许快速访问父进程中的一个大数据框?我爱熊猫!
我有以下几个在多个消费者之间共享的类(使用生产者/消费者方法).我的问题涉及这个类上调用的方法.我需要实现锁定还是经理类线程安全?
import multiprocessing as mp
from multiprocessing.manager import BaseManager
class SampleClass(object):
def __init__(self):
self._count = 0
# Does locking need to be implemented here?
def increment(self):
self._count += 1
BaseManager.register('SampleClass', SampleClass)
manager = BaseManager()
manager.start()
instance = manager.SampleClass()
jobs = []
for i in range(0, 5):
p = mp.Process(target=some_func, args=(instance,))
jobs.append(p)
p.start()
for p in jobs:
p.join()
Run Code Online (Sandbox Code Playgroud) 我面临多处理问题.多处理堆栈溢出问题的很大一部分没有我的情况复杂,也没有回答它.有些人投票说这个问题可能重复,但我的情况有所不同,在我的情况下,共享DICT在进程之间被修改了:
我有一个程序遵循这个简化的生命周期:
A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C
Run Code Online (Sandbox Code Playgroud)
性能是问题的一个非常重要的方面.我尝试了许多正面和负面的解决方案:
在步骤中B,DICT变量被分叉到子流程环境中.但经过一步E子过程无法看到变化.
在步骤Adict创建时multiprocessing.Manager(请参阅此处的 "服务器进程" ).
multiprocessing.Manager使用序列化层(我不太了解它,但它能够与网络上的进程一起工作),这对性能有害.multiprocessing.Value并multiprocessing.Array允许使用共享内存.我尝试用几个替换我的dict multiprocessing.Value并且multiprocessing.Array像这样:
用dict:
manager = multiprocessing.Manager()
dict = …Run Code Online (Sandbox Code Playgroud) 我正在使用concurrent.futures.ThreadPoolExecutor来查看我是否可以从我的四核处理器(具有8个逻辑核心)中挤出更多工作.所以我写了下面的代码:
from concurrent import futures
def square(n):
return n**2
def threadWorker(t):
n, d = t
if n not in d:
d[n] = square(n)
def master(n, numthreads):
d = {}
with futures.ThreadPoolExecutor(max_workers=numthreads) as e:
for i in e.map(threadWorker, ((i, d) for i in range(n))):
pass # done so that it actually fetches each result. threadWorker has its own side-effects on d
return len(d)
if __name__ == "__main__":
print('starting')
print(master(10**6, 6))
print('done')
Run Code Online (Sandbox Code Playgroud)
有趣的是,在for循环中编写相同的功能需要大约一秒钟:
>>> d = {}
>>> for i in range(10**6): …Run Code Online (Sandbox Code Playgroud) python multithreading python-3.x threadpoolexecutor concurrent.futures
我有一个相当复杂的递归函数,有许多参数(Obara-Saika-Scheme,如果有人想知道),我想更有效地推测.作为我应用的第一步@functools.lru_cache.作为第二步,我现在想multiprocessing.Pool用于异步评估一长串输入参数.
调整functools Python文档中的第二个示例并添加我拥有的工作池:
from multiprocessing import Pool
from functools import lru_cache
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n)
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with Pool(processes=4) as pool:
for i in range(10):
res = pool.apply_async(fibonacci, (i,))
print(res.get())
print(fibonacci.cache_info())
Run Code Online (Sandbox Code Playgroud)
问题1
如何通过不同的工作人员共享缓存.另一个问题(如何共享缓存?)是在问类似的事情,但我无法让它工作.以下是我对此的两种失败方法.
使用multiprocessing.Pool:
from multiprocessing import Pool
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < …Run Code Online (Sandbox Code Playgroud) TL;DR:如何以高性能的方式在多个进程之间共享大型(200MB)只读字典,该字典的访问量非常大,而每个进程在内存中都没有完整的副本。
编辑:看起来如果我只是将字典作为 multiprocessing.Pool/Process 的参数传递,它实际上不会创建副本,除非工作人员修改字典。我只是假设它会复制。这种行为似乎仅适用于 fork 可用的 Unix,但并不总是如此。但如果是这样,它应该可以解决我的问题,直到将其转换为 ETL 作业。
我正在尝试做的事情:
我的任务是改进将数据从一个商店复制到另一个商店的脚本。途中对数据进行标准化和转换。此任务的工作规模约为 1 亿个文档,这些文档来自源文档存储,这些文档被汇总并推送到另一个目标文档存储。
每个文档都有一个 ID,还有另一个文档存储,本质上是这些 ID 的键值存储,映射到此任务所需的一些附加信息。该存储要小得多,并且在来自主存储的文档通过时对其进行查询,如果没有大量缓存,那么这并不是一个真正的选择,并且大量缓存最终很快就会成为整个事物的副本。我只是在开始任何事情之前从整个商店创建整个字典字典并使用它。该字典大小约为 200MB。请注意,这本词典只能被读取。
为此,我设置了多处理并拥有大约 30 个并发进程。我对每个流程的工作进行了划分,以便每个流程都有不同的指标,并且可以在大约 4 小时内完成整个工作。
我注意到,在执行以下两件事时,我的 CPU 受到极大限制:
做这些事情时我会受到内存限制:
我强烈怀疑,除非我错过了一些东西,否则我将不得不“下载更多内存”或从Python重写为没有GIL的东西(或者使用ETL,就像应该在...中完成的那样)。
对于 ram 来说,存储这样的字典以减少其刺痛的最有效方法是什么?它目前是映射到由 3 个长整型/浮点组成的额外信息元组的标准字典。
doc_to_docinfo = { "ID1": (5.2, 3.0, 455), }
对于这个用例,是否有比我正在做的更有效的哈希图实现?
以下代码有效,但由于传递大型数据集而非常慢。在实际实现中,创建进程和发送数据的速度几乎和计算时间一样,所以到创建第二个进程的时候,第一个进程几乎已经完成了计算,并行化?无意义。
代码与此问题中的代码相同,Multiprocessing 在 992 个整数处截止,结果建议更改工作并在下面实现。但是,我遇到了我认为的其他人的常见问题,酸洗大数据需要很长时间。
我看到使用 multiprocessing.array 传递共享内存数组的答案。我有一个大约 4000 个索引的数组,但每个索引都有一个包含 200 个键/值对的字典。每个进程只读取数据,完成一些计算,然后返回一个矩阵(4000x3)(没有字典)。
这样的答案是只读共享复制到不同的流程,Python的多处理数据?使用地图。是否可以维护以下系统并实现共享内存?是否有一种有效的方法将数据发送到带有 dict 数组的每个进程,例如将 dict 包装在某个管理器中,然后将其放入 multiprocessing.array 中?
import multiprocessing
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,200):
data[str(i)] = i
CalcManager(total,start=0,end=3000)
def CalcManager(myData,start,end):
print 'in calc manager'
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue() …Run Code Online (Sandbox Code Playgroud) 我在程序开始时设置了numpy随机种子.在程序执行期间,我使用多次运行函数multiprocessing.Process.该函数使用numpy随机函数绘制随机数.问题是Process获取当前环境的副本.因此,每个进程都独立运行,它们都以与父环境相同的随机种子开始.
所以我的问题是如何在子环境中与父进程环境共享numpy的随机状态?请注意,我想Process用于我的工作,需要使用一个单独的类,并分别import numpy在该类中执行.我尝试使用multiprocessing.Manager共享随机状态,但似乎事情没有按预期工作,我总是得到相同的结果.此外,如果我将for循环移动到内部drawNumpySamples或将其留在其中并不重要main.py; 我仍然无法得到不同的数字,随机状态总是一样的.这是我的代码的简化版本:
# randomClass.py
import numpy as np
class myClass(self):
def __init__(self, randomSt):
print ('setup the object')
np.random.set_state(randomSt)
def drawNumpySamples(self, idx)
np.random.uniform()
Run Code Online (Sandbox Code Playgroud)
并在主文件中:
# main.py
import numpy as np
from multiprocessing import Process, Manager
from randomClass import myClass
np.random.seed(1) # set random seed
mng = Manager()
randomState = mng.list(np.random.get_state())
myC = myClass(randomSt = randomState)
for i in range(10):
myC.drawNumpySamples() # this will …Run Code Online (Sandbox Code Playgroud) 在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行(2+)个进程,我创建了一个最小的代表性示例,如下所示。
我的问题是:
1)我的理解是,由于我有不同的功能,我不能使用Pool.map_async()及其变体。是对的吗?我见过的唯一例子是重复相同的功能,就像这个答案一样。
2)使此设置永久运行的最佳实践是什么?在下面的代码中,我使用了一个while循环,我怀疑它不适合此目的。
3)Process我使用和 的方式是Manager最佳的吗?我使用multiprocessing.Manager.dict()共享字典来返回进程的结果。我在这个答案的评论中看到,使用Queuehere是有意义的,但是该Queue对象没有“.dict()”方法。所以,我不确定这会如何运作。
对于示例代码的任何改进和建议,我将不胜感激。
import numpy as np
import pandas as pd
import multiprocessing
import time
def worker1(name, t , seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
time.sleep(t)
np.random.seed(seed)
df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
def worker2(name, t, seed, return_dict):
'''worker function'''
print(str(name) + …Run Code Online (Sandbox Code Playgroud) python ×11
multiprocessing ×10
python-3.x ×4
dictionary ×2
pandas ×2
queue ×2
arrays ×1
caching ×1
dataframe ×1
kwargs ×1
numpy ×1
python-2.7 ×1
recursion ×1
scipy ×1