我有一个脚本,通过imap_unordered()调用成功地执行多处理池任务集:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)
但是,我num_tasks大概是250,000,所以join()锁定主线程10秒左右,我希望能够逐步回显到命令行,以显示主进程未被锁定.就像是:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
是否有结果对象或池本身的方法指示剩余的任务数量?我尝试使用一个multiprocessing.Value对象作为计数器(在完成任务后do_work调用一个counter.value += 1 …
是否有可能创建一个非守护进程的python池?我希望一个池能够调用一个内部有另一个池的函数.
我想要这个,因为deamon进程无法创建进程.具体来说,它会导致错误:
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
例如,考虑function_a具有运行的池的场景,该池具有运行function_b的池function_c.此函数链将失败,因为function_b正在守护进程中运行,并且守护进程无法创建进程.
我试着阅读http://docs.python.org/dev/library/multiprocessing.html上的文档,但我仍然在努力处理多处理队列,池和锁定.现在我能够构建下面的示例.
关于队列和池,我不确定我是否以正确的方式理解了这个概念,所以如果我错了,请纠正我.我想要实现的是在时间处理2个请求(在这个例子中数据列表有8个)所以,我应该使用什么?池创建2个进程,可以处理两个不同的队列(最多2个)或者我应该只使用Queue每次处理2个输入?锁定将正确打印输出.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
Run Code Online (Sandbox Code Playgroud) 令人尴尬的并行问题通常包括三个基本部分:
我们可以在两个方面并行化程序:
这似乎是并发编程中最基本的模式,但我仍然试图解决它,所以让我们写一个规范的例子来说明如何使用多处理来完成.
下面是示例问题:给定一个包含整数行作为输入的CSV文件,计算它们的总和.将问题分成三个部分,这些部分可以并行运行:
下面是传统的单进程绑定Python程序,它解决了以下三个任务:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""
import csv
import optparse
import sys
def make_cli_parser():
"""Make the command line interface parser."""
usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
__doc__,
""" …Run Code Online (Sandbox Code Playgroud) 背景:
我正在使用一个使用Django和Postgres数据库的项目.我们也在使用mod_wsgi以防万一,因为我的一些网络搜索提到了它.在Web表单提交中,Django视图启动了一项需要花费大量时间的工作(超过用户想要等待的时间),因此我们在后台通过系统调用启动工作.现在运行的作业需要能够读取和写入数据库.因为这项工作需要很长时间,所以我们使用多处理来并行运行部分工作.
问题:
顶级脚本具有数据库连接,当它生成子进程时,似乎父级的连接可供子级使用.然后有一个例外,关于如何在查询之前调用SET TRANSACTION ISOLATION LEVEL.研究表明,这是因为尝试在多个进程中使用相同的数据库连接.我发现一个线程建议在子进程的开头调用connection.close(),这样Django会在需要时自动创建一个新连接,因此每个子进程都有一个唯一的连接 - 即不共享.这对我来说不起作用,因为在子进程中调用connection.close()会导致父进程抱怨连接丢失.
其他调查结果:
我读过的一些东西似乎表明你不能真正做到这一点,而且多处理,mod_wsgi和Django不能很好地结合在一起.我觉得这似乎很难相信.
有人建议使用芹菜,这可能是一个长期的解决方案,但我目前无法安装芹菜,等待一些批准程序,所以现在不是一个选项.
在SO和其他地方找到了关于持久数据库连接的几个参考资料,我认为这是一个不同的问题.
还找到了关于psycopg2.pool和pgpool的引用以及关于bouncer的内容.不可否认,我不理解我正在阅读的大部分内容,但它肯定不会像我一样对我跳出来.
目前的"解决方案":
就目前而言,我已经恢复了只是连续运行的东西,它可以工作,但比我想要的慢.
关于如何使用多处理并行运行的任何建议?好像我可以让父母和两个孩子都拥有与数据库的独立连接,事情就可以了,但我似乎无法得到那种行为.
谢谢,抱歉这个长度!
我有一个60GB的SciPy数组(矩阵)我必须在5个以上的multiprocessing Process对象之间共享.我已经看过numpy-sharedmem并在SciPy列表上阅读了这个讨论.似乎有是两个approaches-- numpy-sharedmem和使用multiprocessing.RawArray(),并映射NumPy的dtypes到ctype秒.现在,numpy-sharedmem似乎是要走的路,但我还没有看到一个很好的参考例子.我不需要任何类型的锁,因为数组(实际上是矩阵)将是只读的.现在,由于它的大小,我想避免副本.这听起来像是正确的方法是创建唯一的数组作为副本sharedmem数组,然后将它传递给Process对象?几个具体问题:
将sharedmem句柄实际传递给子的最佳方法是Process()什么?我是否需要一个队列来传递一个阵列?管道会更好吗?我可以将它作为参数传递给Process()子类的init(我假设它被腌制)吗?
在上面我讨论过的讨论中,有人提到numpy-sharedmem不是64位安全吗?我肯定使用一些不是32位可寻址的结构.
这种RawArray()方法是否存在权衡?更慢,更笨?
我是否需要numpy-sharedmem方法的任何ctype-to-dtype映射?
有没有人有一些OpenSource代码这样做的例子?我是一个非常亲力实践的人,如果没有任何好的例子,很难让它工作.
如果我可以提供任何其他信息以帮助其他人澄清这一点,请发表评论,我将添加.谢谢!
这需要在Ubuntu Linux和Maybe Mac OS上运行,但可移植性不是一个大问题.
我正在尝试在Windows机器上使用线程和多处理的第一个正式的python程序.我无法启动进程,python提供以下消息.问题是,我没有在主模块中启动我的线程.线程在类中的单独模块中处理.
编辑:顺便说一句,这个代码在ubuntu上正常运行.不是在窗户上
RuntimeError:
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.
Run Code Online (Sandbox Code Playgroud)
我的原始代码很长,但我能够在删节版本的代码中重现错误.它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块之外,它只做很少的事情.第二个模块是代码的核心所在.
testMain.py:
import parallelTestModule
extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)
Run Code Online (Sandbox Code Playgroud)
parallelTestModule.py:
import multiprocessing …Run Code Online (Sandbox Code Playgroud) 该multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process.开头的进程.但是,如何与异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
这失败了:
RuntimeError: Queue objects should only be shared between processes through inheritance.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何做我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?
python queue parallel-processing multiprocessing python-multiprocessing
如何在多进程python程序中捕获Ctrl + C并优雅地退出所有进程,我需要解决方案在unix和windows上工作.我尝试过以下方法:
import multiprocessing
import time
import signal
import sys
jobs = []
def worker():
signal.signal(signal.SIGINT, signal_handler)
while(True):
time.sleep(1.1234)
print "Working..."
def signal_handler(signal, frame):
print 'You pressed Ctrl+C!'
# for p in jobs:
# p.terminate()
sys.exit(0)
if __name__ == "__main__":
for i in range(50):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
Run Code Online (Sandbox Code Playgroud)
它有点工作,但我不认为这是正确的解决方案.
编辑: 这可能与此重复
多任务,多道程序和多处理之间的区别是什么
这经常出现在我的大学操作系统考试中,我找不到一个好的答案.我对多任务和多道程序有很多了解,但需要确认一下.
multithreading operating-system multiprocessing multitasking
multiprocessing ×10
python ×8
concurrency ×1
django ×1
multitasking ×1
numpy ×1
pool ×1
python-2.7 ×1
queue ×1
signals ×1
windows ×1