我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
Run Code Online (Sandbox Code Playgroud)
我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作池.大对象以只读方式访问,我不需要在进程之间传递它的修改.
我的问题是:加载到共享内存中的大对象,如果我在unix/c中生成进程,或者每个进程是否加载了自己的大对象副本?
更新:进一步澄清 - big_lookup_object是一个共享查找对象.我不需要拆分它并单独处理它.我需要保留一份副本.我需要分割它的工作是读取许多其他大文件,并在查找对象中查找这些大文件中的项目.
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(shelve或dbm)可能更好.在这个问题中,我对内存解决方案特别感兴趣.对于最终的解决方案,我将使用hadoop,但我想看看我是否也可以拥有本地内存版本.
我更精确地使用Python多处理
from multiprocessing import Pool
p = Pool(15)
args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
这种方法具有巨大的内存消耗; 几乎占用了我所有的RAM(此时它变得非常慢,因此使多处理非常无用).我假设问题是这df是一个巨大的对象(一个大型的pandas数据帧),它会被复制到每个进程.我试过使用multiprocessing.Value共享数据帧而不复制
shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...]
Run Code Online (Sandbox Code Playgroud)
(正如Python多处理共享内存中所建议的那样),但是这给了我TypeError: this type has no size(与在Python进程之间共享一个复杂对象相同?,遗憾的是我不理解答案).
我第一次使用多处理,也许我的理解还不够好.是multiprocessing.Value实际上即使在这种情况下使用了正确的事情?我已经看到了其他建议(例如队列),但现在有点困惑.有什么选择可以共享内存,在这种情况下哪一个最好?
我需要在python中的几个进程之间共享一个对象及其方法.我正在尝试使用Managers(在模块多处理中)但它崩溃了.这是生产者 - 消费者的一个愚蠢的例子,其中两个进程之间的共享对象只是一个包含四种方法的数字列表.
from multiprocessing import Process, Condition, Lock
from multiprocessing.managers import BaseManager
import time, os
lock = Lock()
waitC = Condition(lock)
waitP = Condition(lock)
class numeri(object):
def __init__(self):
self.nl = []
def getLen(self):
return len(self.nl)
def stampa(self):
print self.nl
def appendi(self, x):
self.nl.append(x)
def svuota(self):
for i in range(len(self.nl)):
del self.nl[0]
class numManager(BaseManager):
pass
numManager.register('numeri', numeri, exposed = ['getLen', 'appendi', 'svuota', 'stampa'])
def consume(waitC, waitP, listaNumeri):
lock.acquire()
if (listaNumeri.getLen() == 0):
waitC.wait()
listaNumeri.stampa()
listaNumeri.svuota()
waitP.notify()
lock.release()
def produce(waitC, waitP, …Run Code Online (Sandbox Code Playgroud) 我想使用multiprocessing.Value在多个进程中使用变量,但Python的文档中的语法并不清楚.任何人都可以告诉我应该使用什么作为类型(我的变量是一个字母),以及在哪里放置我的变量的名称?
编辑
我尝试使用管理器在进程之间共享我的信件.但我现在唯一拥有的是Value('ctypes.c_char_p', '(你在这里点击的键)') 在Python Shell中打印但仍然没有声音.使用管理器时,控制台似乎比平常慢一点.在我按键和Value屏幕上显示之间有几乎一秒的延迟.
我的代码现在看起来像这样:
#Import
from tkinter import *
import wave
import winsound
import multiprocessing
#Functions
def key(event):
temp = event.char
manager = multiprocessing.Manager()
manager.Value(ctypes.c_char_p, temp)
hitkey = manager.Value(ctypes.c_char_p, temp)
instance = multiprocessing.Process(target=player, args=(hitkey,))
instance.start()
def player(hitkey):
print(hitkey + "1")
winsound.PlaySound(hitkey + '.wav', winsound.SND_FILENAME|winsound.SND_NOWAIT|winsound.SND_ASYNC)
if __name__ == "__main__":
#Initialisation
fenetre = Tk()
frame = Frame(fenetre, width=200, height=100)
#TK
frame.focus_set()
frame.bind("<Key>", key)
frame.pack()
fenetre.mainloop()
Run Code Online (Sandbox Code Playgroud) 简而言之
我想同时更改复杂的python对象,每个对象只由一个进程处理.我怎么能这样做(效率最高)?实施某种酸洗支持会有帮助吗?这会有效吗?
完整的问题
我有一个python数据结构ArrayDict,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行.在我的例子中,所有键都是整数.
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
Run Code Online (Sandbox Code Playgroud)
现在我有多个这样的ArrayDicts并希望填写它们myMethod(arrayDict, params).由于myMethod价格昂贵,我想并行运行它.请注意,myMethod可能会添加许多行arrayDict.每个过程都会改变自己的过程ArrayDict.我不需要并发访问ArrayDicts.
在myMethod,我更改了条目arrayDict(即,我更改了内部numpy数组),我添加了条目arrayDict(也就是说,我向字典添加另一个索引并在内部数组中写入一个新值).最终,我希望能够在arrayDict内部numpy阵列变得太小时进行交换.这不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作.即使没有阵列交换,我自己的尝试也没有成功.
我花了几天时间研究共享内存和python的多处理模块.由于我最终将在linux上工作,因此任务似乎相当简单:系统调用fork()允许有效地处理参数的副本.我的想法是ArrayDict在自己的进程中更改每个,返回对象的更改版本,并覆盖原始对象.为了节省内存并保存复制工作,我还使用了sharedmem数组来存储数据ArrayDict.我知道字典必须仍然被复制.
from sharedmem import sharedmem
import numpy as np
n = ... # length of …Run Code Online (Sandbox Code Playgroud) 对于这个问题,我参考了Python 文档中讨论“将SharedMemory类与NumPy数组一起使用,numpy.ndarray从两个不同的 Python shell访问相同的数组”中的示例。
我想实现的一个主要变化是操纵类对象的数组,而不是我在下面演示的整数值。
import numpy as np
from multiprocessing import shared_memory
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
# numpy array of class objects
a = np.array([A(1), A(2), A(3)])
# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')
# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
# copy the original data into shared memory
b[:] = …Run Code Online (Sandbox Code Playgroud) 我使用的库为外部程序提供python接口.这允许我创建:
foo = Foo()
Run Code Online (Sandbox Code Playgroud)
上面的代码启动了我可以在python中控制的Foo程序的新实例.
我有一个python脚本,需要多次调用,并根据外部参数,告诉外部Foo程序的单个实例做不同的事情.显然我做不到
foo = Foo() 每次,
因为每次我的脚本运行时都会创建一个新的Foo实例.
我想要做的是创建foo= Foo()一次,并让多个调用共享同一个实例.目前我只想创建一次,序列化它,并让我的脚本反序列化它.这种方法有效吗?还有更好的选择吗?
谢谢!!
我不知道为什么,但是每当我尝试传递给共享对象共享自定义类对象的方法时,我都会收到这个奇怪的错误。Python版本:3.6.3
代码:
from multiprocessing.managers import SyncManager
class MyManager(SyncManager): pass
class MyClass: pass
class Wrapper:
def set(self, ent):
self.ent = ent
MyManager.register('MyClass', MyClass)
MyManager.register('Wrapper', Wrapper)
if __name__ == '__main__':
manager = MyManager()
manager.start()
try:
obj = manager.MyClass()
lst = manager.list([1,2,3])
collection = manager.Wrapper()
collection.set(lst) # executed fine
collection.set(obj) # raises error
except Exception as e:
raise
Run Code Online (Sandbox Code Playgroud)
错误:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "D:\Program Files\Python363\lib\multiprocessing\managers.py", line 228, in serve_client
request = recv()
File "D:\Program Files\Python363\lib\multiprocessing\connection.py", line 251, in recv
return …Run Code Online (Sandbox Code Playgroud) 我寻找了其他问题,这个未接受的答案问题是我能找到的唯一一个以某种方式涵盖了这个问题并且没有真正帮助的问题。另外,我需要它来处理进程,而不是线程。
因此,我从头开始编写了一个示例程序来展示我的问题,您应该能够粘贴它并且它将运行:
import multiprocessing
import time
class Apple:
def __init__(self, color):
self.color = color
def thinkAboutApple(apple):
while True:
print(apple.color)
time.sleep(2)
my_apple = Apple("red")
new_process = multiprocessing.Process(target=thinkAboutApple, args=(my_apple,))
new_process.start()
time.sleep(4)
print("new: brown")
my_apple.color = "brown"
#so that the program doesn't exit after time.sleep(4)
while True:
pass
Run Code Online (Sandbox Code Playgroud)
# actual output | # wanted output
red | red
red | red
new: brown | new: brown
red | brown
red | brown
Run Code Online (Sandbox Code Playgroud)
这告诉我,要么苹果处于一个奇怪的假设,它同时有两种颜色,要么 new_process' 苹果位于 ram 中的另一个位置,并与主进程中的苹果分开。
所以问题是:有没有办法让进程中的苹果指针指向同一个苹果,或者什么是Pythonic方法来保持所有进程中苹果的所有实例相同?如果我在许多进程中有相同的苹果,甚至更多进程没有苹果,我如何确保它们始终相同?
如何与另一个流程共享一个流程的价值?显然我可以通过多线程而不是多处理来做到这一点。多线程对于我的程序来说很慢。
我无法显示我的确切代码,所以我做了这个简单的例子。
from multiprocessing import Process
from threading import Thread
import time
class exp:
def __init__(self):
self.var1 = 0
def func1(self):
self.var1 = 5
print(self.var1)
def func2(self):
print(self.var1)
if __name__ == "__main__":
#multithreading
obj1 = exp()
t1 = Thread(target = obj1.func1)
t2 = Thread(target = obj1.func2)
print("multithreading")
t1.start()
time.sleep(1)
t2.start()
time.sleep(3)
#multiprocessing
obj = exp()
p1 = Process(target = obj.func1)
p2 = Process(target = obj.func2)
print("multiprocessing")
p1.start()
time.sleep(2)
p2.start()
Run Code Online (Sandbox Code Playgroud)
预期输出:
from multiprocessing import Process
from threading import Thread
import …Run Code Online (Sandbox Code Playgroud) python multiprocessing python-multithreading python-3.x python-multiprocessing