mep*_*erp 1 python stdout multiprocessing
我正在尝试并行运行一些Python函数,该函数在整个函数中都有打印命令。我想要的是让每个子进程运行相同的功能,以分组的方式输出到主标准输出。我的意思是,我希望每个子流程的输出仅在完成其任务后才打印。但是,如果在此过程中发生某种错误,我仍要输出子过程中所做的任何事情。
一个小例子:
from time import sleep
import multiprocessing as mp
def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)
if __name__ == '__main__':
pool = mp.Pool()
jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)
for job in jobs:
job.wait()
Run Code Online (Sandbox Code Playgroud)
这是并行运行的,但是输出的是:
foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4
Run Code Online (Sandbox Code Playgroud)
我想要的是:
foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4
Run Code Online (Sandbox Code Playgroud)
只要每个子过程的每个输出都分组在一起,那么哪个过程的特定顺序都没有关系。有趣的是,如果我这样做,我会得到所需的输出
python test.py > output
Run Code Online (Sandbox Code Playgroud)
我知道每个子进程都没有自己的标准输出,而是使用主标准输出。我已经思考并查找了一些解决方案,例如使其成为一个队列,并且每个子进程都有其自己的stdout,然后在完成后,我们重写flush命令,以便可以将输出输出回去。到队列。之后,我们可以阅读内容。但是,尽管这确实满足了我的要求,但是如果函数中途停止,则无法检索输出。仅在成功完成后才输出。从这里得到它在python中访问子进程的标准输出
我也看到了锁的用法,该方法有效,但是它完全杀死了并行运行该函数,因为它必须等待每个子进程执行执行函数foo的功能。
另外,如果可能的话,我想避免更改foo函数的实现,因为我需要更改许多功能。
编辑:我已经调查了库dispy和并行python。Dispy完全可以实现我想要的功能,它有一个单独的stdout / stderr,我可以在最后打印出来,但是dispy的问题是我必须在单独的终端中手动运行服务器。我希望能够一次全部运行python程序,而不必先打开另一个脚本。另一方面,并行Python可以满足我的要求,但是似乎缺少对它的控制,以及一些令人讨厌的麻烦。特别是,当您打印输出时,它也打印出函数的返回类型,我只想要使用print打印的输出。此外,在运行函数时,您必须为其提供使用的模块列表,这有点烦人,
正如您所注意到的,在这种情况下使用锁会杀死多进程,因为您实际上会让所有进程都等待当前持有STDOUT的“权限”的进程释放互斥量。但是,从逻辑上讲,并行运行并与功能/子流程同步打印是排他的。
相反,您可以做的是让主流程充当子流程的“打印机”,这样子流程一旦完成/出现错误,然后然后才将要打印的内容发送回主流程。您似乎非常满意打印不是“实时”的(无论如何也不能如前所述),因此这种方法应该恰好为您服务。所以:
import multiprocessing as mp
import random # just to add some randomness
from time import sleep
def foo(x):
output = ["[Process {}]: foo:".format(x)]
for i in range(5):
output.append('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
return "\n".join(output)
if __name__ == '__main__':
pool = mp.Pool(4)
for res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response:")
print(res) # this will print as soon as one of the processes finishes/errors
pool.close()
Run Code Online (Sandbox Code Playgroud)
这会给你(当然是YMMV):
[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
Run Code Online (Sandbox Code Playgroud)
您可以观察到其他任何东西,包括以同样的方式出现错误。
更新 -如果您绝对必须使用其输出无法控制的功能,则可以包装子流程并捕获其STDOUT / STDERR,然后一旦完成(或引发异常),就可以将所有内容返回给流程'manager '打印到实际的STDOUT。通过这样的设置,我们可以foo()像:
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
Run Code Online (Sandbox Code Playgroud)
请注意,它非常高兴地不知道有什么东西试图破坏其操作模式。然后,我们将创建一个外部通用包装器(因此您不必依赖于功能对其进行更改)实际上是在搅乱其默认行为(不仅是此函数,还包括它在运行时可能调用的其他一切):
def std_wrapper(args):
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers
# args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
process_name = args[0]
process_args = args[1] if len(args) > 1 else []
process_kwargs = args[2] if len(args) > 2 else {}
# get our method from its name, assuming global namespace of the current module/script
process = globals()[process_name]
response = None # in case a call fails
try:
response = process(*process_args, **process_kwargs) # call our process function
except Exception as e: # too broad but good enough as an example
print(e)
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
Run Code Online (Sandbox Code Playgroud)
现在,我们需要的是调用此包装器而不是所需的包装器foo(),并为其提供有关代表我们调用什么的信息:
if __name__ == '__main__':
pool = mp.Pool(4)
# since we're wrapping the process we're calling, we need to send to the wrapper packed
# data with instructions on what to call on our behalf.
# info on args packing available in the std_wrapper function above.
for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
Run Code Online (Sandbox Code Playgroud)
因此,如果现在运行它,您将得到如下所示的内容:
[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!
Run Code Online (Sandbox Code Playgroud)
尽管foo()只是打印掉或出错。当然,您可以使用此类包装器来调用任何函数,并将任何数量的args / kwargs传递给该函数。
更新#2-但是,等一下!如果我们可以像这样包装我们的函数过程,并捕获其STDOUT / STDERR,那么我们当然可以将其转换为装饰器,并通过简单的装饰在我们的代码中使用它。因此,对于我的最终建议:
import functools
import multiprocessing
import random # just to add some randomness
import time
def std_wrapper(func):
@functools.wraps(func) # we need this to unravel the target function name
def caller(*args, **kwargs): # and now for the wrapper, nothing new here
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead
response = None # in case a call fails
try:
response = func(*args, **kwargs) # call our wrapped process function
except Exception as e: # too broad but good enough as an example
print(e) # NOTE: the exception is also printed to the captured STDOUT
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
return caller
@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
time.sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
Run Code Online (Sandbox Code Playgroud)
现在,我们可以像以前一样调用包装函数了,而无需处理参数打包或任何类似的事情,因此我们回到了:
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for out, err, res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
Run Code Online (Sandbox Code Playgroud)
输出与前面的示例相同,但包装更好且易于管理。
| 归档时间: |
|
| 查看次数: |
2290 次 |
| 最近记录: |