几年前,在Windows环境中,我做了一些测试,通过让多个CPU计算实例密集+内存访问密集型+ I/O访问密集型应用程序运行.我开发了两个版本:一个在多处理下运行,另一个在多线程下运行.
我发现多处理的性能要好得多.我在其他地方读过(但我不记得这个网站).
这说明原因是在多线程下,他们正在为单个内存管道和I/O管道"战斗",这使得与多处理相比性能更差
但是,我再也找不到那篇文章了.我想知道,直到今天,下面是否仍然如此?
在Windows中,如果算法代码在多处理下运行,则性能很可能优于多线程.
我有一小部分工人(4)和一大堆任务(5000~).我正在使用池并使用map_async()发送任务.因为我正在运行的任务相当长,所以我强制执行1的chunksize,这样一个长进程就无法阻止一些较短的进程.
我想做的是定期检查剩下的任务数量.我知道最多4个会活跃,我关心有多少人要处理.
我用Google搜索过,我找不到任何人这样做.
一些简单的代码可以帮助:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
Run Code Online (Sandbox Code Playgroud) 我正在编写一个启动两个进程的程序.
第一个进程,"客户端"发送两种类型的消息.
第一种类型增加了共享资源(int).第二种类型将资源设置为0.
在10条消息之后,客户端必须发送一个特殊类型的消息,该消息强制侦听两个队列的线程停止.因此,客户端在类型字段中发送两条带有特殊值的消息(每个队列一个),以终止线程.
第二个过程是"服务器".
服务器有三个线程:
第一个是监听"增加"队列.它必须处理增加请求,直到终止消息.所以我写道:
do{
msgrcv(id_i,&msg,dimensione,INCREMENTA,0);
pthread_mutex_lock(&mutex);
printf("THREAD 1: Il contatore vale:%d\n",*contatore);
incremento = msg.contenuto;
printf("THREAD 1: Incremento di : %d\n",incremento);
*contatore+=incremento;
printf("THREAD 1: Il contatore vale:%d\n",*contatore);
pthread_mutex_unlock(&mutex);
msgrcv(id_i,&msg,dimensione,TERMINA,IPC_NOWAIT); //IPC_NOWAIT or the thread will
freeze after the first message
}
while(msg.tipo!=TERMINA);
Run Code Online (Sandbox Code Playgroud)
第二个必须处理"设置为0"请求,直到终止消息.
do{msgrcv(id_a,&msg,dimensione,AZZERA,0);
pthread_mutex_lock(&mutex);
printf("THREAD 2: IL CONTATORE VALE:%d\n",*contatore);
*contatore=0;
printf("Thread 2: Contatore azzerato. Ora vale : %d\n",*contatore);
pthread_mutex_unlock(&mutex);
msgrcv(id_a,&msg,dimensione,TERMINA,IPC_NOWAIT);//IPC_NOWAIT or the thread will
freeze after the first message
}
while(msg.tipo!=TERMINA);
Run Code Online (Sandbox Code Playgroud)
第三个线程使用互斥锁增加资源的值以进入互斥.
问题是服务器进程的thread1和thread2没有终止它们应该的位置.事实上,他们在所有增加/ set0消息之后停留在第一个msgrcv()上.所以问题是两个线程无法管理监听终止消息.
我试图为第一个msgrcv设置IPC_NOWAIT但是没有用
我设置了一个进程来读取要下载的传入URL的队列,但是当urllib2打开连接时系统挂起.
import urllib2, multiprocessing
from threading import Thread
from Queue import Queue
from multiprocessing import Queue as ProcessQueue, Process
def download(url):
"""Download a page from an url.
url [str]: url to get.
return [unicode]: page downloaded.
"""
if settings.DEBUG:
print u'Downloading %s' % url
request = urllib2.Request(url)
response = urllib2.urlopen(request)
encoding = response.headers['content-type'].split('charset=')[-1]
content = unicode(response.read(), encoding)
return content
def downloader(url_queue, page_queue):
def _downloader(url_queue, page_queue):
while True:
try:
url = url_queue.get()
page_queue.put_nowait({'url': url, 'page': download(url)})
except Exception, err:
print u'Error …Run Code Online (Sandbox Code Playgroud) 我使用tqdm将进度条添加到了2.7 python代码中,但是它大大降低了我的代码速度。如果没有进度条,则需要12秒,而使用进度条则需要57秒。
没有进度条的代码如下所示:
p = mp.Pool()
combs = various combinations
result = p.map(self.parallelize, combs)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
带有进度条的代码如下:
from tqdm import tqdm
p = mp.Pool()
combs = various combinations
result = list(tqdm(p.imap(self.parallelize, combs), total = 5000))
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
有没有一种更好的方法不会降低我的代码速度呢?
我已经看过几个关于这个主题的问题,但我没有得到完整答案......
我的代码基本上是:
from multiprocessing import Process
p = Process(target=f).start()
p.join()
def f():
print 'break!'
Run Code Online (Sandbox Code Playgroud)
我想在这个问题上设一个断点print.我正在使用pydev + eclipse(在Ubuntu上).
(请注意:有一个名为“ SQLite3 和 Multiprocessing ”的问题,但该问题实际上是关于多线程的,因此是公认的答案,这不是重复的)
我正在实现一个多进程脚本,每个进程都需要在一个 sqlite 表中写入一些结果。我的程序不断崩溃database is locked(使用 sqlite 一次只允许一个数据库修改)。
这是我所拥有的一个例子:
def scan(n):
n = n + 1 # Some calculation
cur.execute(" \
INSERT INTO hello \
(n) \
VALUES ('"+n+"') \
")
con.commit()
con.close()
return True
if __name__ == '__main__':
pool = Pool(processes=int(sys.argv[1]))
for status in pool.imap_unordered(scan, range(0,9999)):
if status:
print "ok"
pool.close()
Run Code Online (Sandbox Code Playgroud)
我已经尝试通过在 main 中声明一个锁并将其用作全局 in 来使用锁scan(),但这并没有阻止我获得database is locked.
确保在多进程 Python 脚本中同时只发出一个 INSERT 语句的正确方法是什么?
编辑:
我在基于 Debian 的 …
我有一个多维数组(result),应该由一些嵌套循环填充。函数fun()是一个复杂且耗时的函数。我想以并行方式填充数组元素,因此我可以使用系统的所有处理能力。这是代码:
import numpy as np
def fun(x, y, z):
# time-consuming computation...
# ...
return output
dim1 = 10
dim2 = 20
dim3 = 30
result = np.zeros([dim1, dim2, dim3])
for i in xrange(dim1):
for j in xrange(dim2):
for k in xrange(dim3):
result[i, j, k] = fun(i, j, k)
Run Code Online (Sandbox Code Playgroud)
我的问题是“我是否可以并行化此代码?如果可以,如何?”
我正在使用Windows 10 64位和python 2.7。
如果可以的话,请通过更改我的代码来提供您的解决方案。谢谢!
我在多进程场景中使用 sqlite。sqlite 库是使用线程安全序列化模式(-DSQLITE_THREADSAFE=1)编译的。
我希望收到有关数据更改和发现的通知sqlite3_update_hook。每个进程都会注册自己的更新挂钩,以便收到数据库更改的通知。
现在的问题是:如果进程A修改了数据库,进程B的update hook会被调用吗?或者钩子只能在同一进程或同一连接中工作吗?
遗憾的是,文档对此并不是很清楚。
我通过多进程池为每个子进程实例化一个 grpc 服务器。当我使用多个客户端访问服务器时,发现以下两个问题?
顺便说一下,我的开发环境是:
[OS]
ProductName: Mac OS X
ProductVersion: 10.14.6
BuildVersion: 18G5033
[packages]
grpcio = '1.30.0'
grpcio-tools = '1.30.0'
multiprocess = "0.70.10"
grpcio-status = "1.30.0"
googleapis-common-protos = "1.52.0"
[requires]
python_version = "3.8.3"
Run Code Online (Sandbox Code Playgroud)
这是服务器输出:
[PID 83287] Binding to 'localhost:52909'
[PID 83288] Starting new server.
[PID 83289] Starting new server.
[PID 83290] Starting new server.
[PID 83291] Starting new server.
[PID 83292] Starting new server.
[PID 83293] Starting new server.
[PID 83294] Starting new server.
[PID 83295] Starting …Run Code Online (Sandbox Code Playgroud) multiprocess ×10
python ×7
sqlite ×2
architecture ×1
c ×1
c++ ×1
debugging ×1
eclipse ×1
grpc ×1
grpc-python ×1
hook ×1
nested-loops ×1
pool ×1
progress-bar ×1
pydev ×1
python-3.x ×1
tqdm ×1
urllib2 ×1
windows ×1