我在尝试理解多处理队列如何在python上工作以及如何实现它时遇到了很多麻烦.假设我有两个从共享文件访问数据的python模块,让我们将这两个模块称为编写者和读者.我的计划是让读取器和写入器将请求放入两个单独的多处理队列,然后让第三个进程在循环中弹出这些请求并执行.
我的主要问题是我真的不知道如何正确实现multiprocessing.queue,你不能真正实例化每个进程的对象,因为它们将是独立的队列,你如何确保所有进程都与共享队列相关(或者在这种情况下,队列)
我发现在Python 3.4中,很少有用于多处理/线程的不同库:多处理与线程和asyncio.
但我不知道使用哪一个或是"推荐的".他们做同样的事情,还是不同?如果是这样,哪一个用于什么?我想编写一个在我的计算机中使用多核的程序.但我不知道我应该学习哪个图书馆.
python multithreading multiprocessing python-3.x python-asyncio
我有一个大文本文件,我想处理每一行(做一些操作)并将它们存储在数据库中.由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成.每个线程/进程应从该单个文件中读取不同的数据(不同的行),并对其数据(行)执行一些操作并将它们放入数据库中,以便最终处理完所有数据并进行处理.数据库与我需要的数据一起转储.
但我无法弄清楚如何处理这个问题.
为了使我的代码更"pythonic"和更快,我使用"多处理"和一个map函数发送它a)函数和b)迭代范围.
植入的解决方案(即直接在范围tqdm.tqdm(范围(0,30))上调用tqdm不适用于多处理(如下面的代码所示).
进度条显示从0到100%(当python读取代码?)但它不指示map函数的实际进度.
如何显示一个进度条,指示"地图"功能在哪一步?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
欢迎任何帮助或建议......
我有一个CPU密集型Celery任务.我想在许多EC2实例中使用所有处理能力(核心)来更快地完成这项工作(我认为芹菜并行分布式多任务处理任务).
术语,线程,多处理,分布式计算,分布式并行处理都是我试图更好理解的术语.
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Run Code Online (Sandbox Code Playgroud)
使用上面的代码(如果可能的话,有一个例子)如何使用Celery分配这个任务,允许利用云中所有可用机器的所有计算CPU功率来分离这一任务?
有没有办法在python中使用multiprocessing.Process类时记录给定进程的stdout输出?
我知道这已经得到了回答,但似乎直接执行脚本"python filename.py"不起作用.我在SuSE Linux上有Python 2.6.2.
码:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Pool
p = Pool(1)
def f(x):
return x*x
p.map(f, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)
命令行:
> python example.py
Process PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.6/multiprocessing/pool.py", line 57, in worker
task = get()
File "/usr/lib/python2.6/multiprocessing/queues.py", line 339, in get
return recv()
AttributeError: 'module' object has no attribute 'f'
Run Code Online (Sandbox Code Playgroud) 似乎当从multiprocessing.Pool进程引发异常时,没有堆栈跟踪或任何其他指示它已失败.例:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
打印1并静默停止.有趣的是,提高BaseException会起作用.有没有办法让所有异常的行为与BaseException相同?
我正在尝试解决一个涉及大量子问题的大数值问题,我正在使用Python的多处理模块(特别是Pool.map)将不同的独立子问题拆分到不同的核心上.每个子问题涉及计算大量的子子问题,我试图通过将它们存储到文件中来有效地记忆这些结果,如果它们还没有被任何进程计算,否则跳过计算并只读取文件中的结果.
我有文件的并发问题:不同的进程有时检查是否已经计算了子子问题(通过查找存储结果的文件),看到它没有,运行计算,然后尝试同时将结果写入同一文件.我该如何避免写这样的碰撞?
我有一个SQL Server(2012),我使用Entity Framework(4.1)访问.在数据库中,我有一个名为URL的表,一个独立的进程在其中提供新的URL.URL表中的条目可以处于"新建","处理中"或"已处理"状态.
我需要从不同的计算机访问URL表,检查状态为"新"的URL条目,取第一个并将其标记为"正在处理".
var newUrl = dbEntity.URLs.FirstOrDefault(url => url.StatusID == (int) URLStatus.New);
if(newUrl != null)
{
newUrl.StatusID = (int) URLStatus.InProcess;
dbEntity.SaveChanges();
}
//Process the URL
Run Code Online (Sandbox Code Playgroud)
由于查询和更新不是原子的,我可以读取两个不同的计算机并更新数据库中的相同URL条目.
有没有办法让select-then-update序列原子化以避免这种冲突?
multiprocessing ×10
python ×9
c# ×1
celery ×1
concurrency ×1
django ×1
exception ×1
io ×1
logging ×1
mutex ×1
progress-bar ×1
python-3.x ×1
sql-server ×1
tqdm ×1