我有一个单线程Python程序,我想修改它以使用它运行的服务器上的所有32个处理器.正如我所设想的那样,每个工作进程都会从队列中接收其作业并将其输出提交到队列.但是,要完成其工作,每个工作进程都需要只读访问复杂的内存中数据结构 - 许多千兆字节的dicts和彼此链接的对象.在python中,有没有一种简单的方法来共享这个数据结构,而不为每个工作进程复制它?
谢谢.
如果我调用apply_async 10,000次,假设OOM杀手没有干扰,多处理会同时启动它们,还是会批量启动它们.例如..每100次启动,等待90次完成启动再开始之前?
达斯汀
我有一个类似的python脚本:
def test_run():
global files_dir
for f1 in os.listdir(files_dir):
for f2 os.listdir(files_dir):
os.system("run program x on f1 and f2")
Run Code Online (Sandbox Code Playgroud)
os.system在不同处理器上调用每个呼叫的最佳方法是什么?使用子进程或多处理池?
注意:程序的每次运行都将生成一个输出文件.
我有两个进程,一个将作业添加到队列中,另一个将它们从同一队列中取出并运行它们.这应该按预期工作,我不知道为什么worker永远不会得到任何工作.这是我的代码:
from multiprocessing import Process
from Queue import Queue
import time
q = Queue()
def queuer():
while True:
q.put("JOB")
print "Adding JOB"
time.sleep(1)
def worker():
while True:
if not q.empty():
item = q.get()
print "Running", item
else:
print "No jobs"
time.sleep(1)
a = Process(target=queuer)
a.start()
b = Process(target=worker)
b.start()
Run Code Online (Sandbox Code Playgroud) 和之sum1和sum2必须等499500于此代码的末尾,但它打印出来0,为什么?
import multiprocessing
sum1 = 0
sum2 = 0
def list_append_1(out_list):
global sum1
for i in out_list:
sum1 += i
print "sum1: ", sum1
def list_append_2(out_list):
global sum2
for i in out_list:
sum2 += i
print "sum2: ", sum2
if __name__ == "__main__":
lista_1 = [i for i in xrange(500)]# Number of random numbers to add
lista_2 = [i for i in xrange(500,1000)]
procs = 2 # Number of processes to create …Run Code Online (Sandbox Code Playgroud) 在Python中,我使用了一个名为的库futures,它允许我以简洁明了的方式对N个工作进程池进行处理:
schedulerQ = []
for ... in ...:
workParam = ... # arguments for call to processingFunction(workParam)
schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor: # 5 CPUs
for retValue in executor.map(processingFunction, schedulerQ):
print "Received result", retValue
Run Code Online (Sandbox Code Playgroud)
(这processingFunction是CPU绑定的,所以这里的异步机器没有意义 - 这是关于简单的旧算术计算)
我现在正在寻找在Scala中做同样事情的最接近的方法.请注意,在Python中,为了避免GIL问题,我使用了进程(因此使用了ProcessPoolExecutor代替ThreadPoolExecutor) - 并且库自动地将workParam参数封送到每个执行的流程实例processingFunction(workParam)- 并且它将结果封送回主进程,执行者的map循环消费.
这适用于Scala和JVM吗?我的processingFunction原则上也可以从线程执行(根本没有全局状态) - 但是我有兴趣看到多处理和多线程的解决方案.
问题的关键部分是JVM世界中是否有任何与futures上面看到的Python一样明确的API ...我认为这是我见过的最好的SMP API之一 - 准备一个列表使用所有调用的函数参数,然后只有两行:创建poolExecutor和map处理函数,一旦工作人员生成结果就返回结果.一旦第一次调用processingFunction返回结果,结果就会立即开始,直到它们全部完成为止 - 此时for循环结束.
PIPES是单向的,可以在通信中同步或异步.
但是如何在一个进程上创建一个双向命名管道,在另一个进程上创建另一个端点,这两个进程共享同一个内存,并且存在于同一操作系统中并异步通信.
有办法吗?
我试图掌握Python中的多处理.我从创建此代码开始.它只是计算整数i的cos(i),并测量一个人使用多处理时和一个人不使用时的时间.我没有观察到任何时差.这是我的代码:
import multiprocessing
from multiprocessing import Pool
import numpy as np
import time
def tester(num):
return np.cos(num)
if __name__ == '__main__':
starttime1 = time.time()
pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=pool_size,
)
pool_outputs = pool.map(tester, range(5000000))
pool.close()
pool.join()
endtime1 = time.time()
timetaken = endtime1 - starttime1
starttime2 = time.time()
for i in range(5000000):
tester(i)
endtime2 = time.time()
timetaken2 = timetaken = endtime2 - starttime2
print( 'The time taken with multiple processes:', timetaken)
print( 'The time taken the usual way:', timetaken2)
Run Code Online (Sandbox Code Playgroud)
我观察到两次测量之间没有(或非常小的)差异.我正在使用8芯机器,所以这是令人惊讶的.我在代码中做错了什么? …
python parallel-processing ipython multiprocessing python-3.x
我有一个Web应用程序,当一个用户使用时运行正常,但随着越来越多的客户端开始使用它,它是无法忍受的缓慢.服务器端是用python编写的,并使用龙卷风.我注意到,虽然它运行的服务器有4个内核,但只有1个正在使用,所以我开始关注python的多处理.我已经看到了以下基本示例:http://sebastianraschka.com/Articles/2014_multiprocessing_intro.html和龙卷风处理器来自:http: //tornado.readthedocs.org/en/latest/_modules/tornado/process.html(这似乎有点复杂),但我仍然不确定这是我正在寻找的.如果有50个用户一次查看它,那么使用4个处理器运行此代码会加快速度吗?如果是这样的选项之一,那么在使用龙卷风Web服务器时可以采用哪种方式?
对于模糊和写得不好的问题感到抱歉 - 即使经过广泛的研究,我对多处理的缺乏经验也应归咎于此.如果任何示例代码有助于回答此问题,请告诉我们.
谢谢!
我是多处理程序包的新手,我无法完成任务.
我在对象列表上有很多计算要做.
我需要写下的结果也保存在这些对象中.
一旦过程完成计算(我至少工作的方式,等待所有计算完成),结果应该写在一个文件中.
import multiprocessing
import time
import csv
class simpl():
def __init__(self, name, val):
self.name = name
self.val = val
def pot_val(inpt):
print("Process %s\t ..." % (inpt.name))
old_v = inpt.val
inpt.val *= inpt.val
if old_v != 8:
time.sleep(old_v)
print("Process %s\t ... Done" % (inpt.name))
def mp_worker(inpt):
pot_val(inpt)
return inpt
def mp_handler(data_list):
p = multiprocessing.Pool(4)
with open('results.csv', 'a') as f:
res = p.map_async(mp_worker, data_list)
results = (res.get())
for result in results:
print("Writing result for ",result.name)
writer= csv.writer(f, lineterminator = …Run Code Online (Sandbox Code Playgroud) multiprocessing ×10
python ×8
high-volume ×1
ipython ×1
kernel ×1
linux ×1
linux-kernel ×1
python-3.x ×1
scala ×1
server ×1
tornado ×1
web ×1