使用cProfile分析完全正常工作的多处理python脚本时出错

bli*_*bli 7 python pickle cprofile

我编写了一个使用的小python脚本multiprocessing(参见/sf/answers/2931299801/).它在我测试它时起作用:

$ ./forkiter.py
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30
Run Code Online (Sandbox Code Playgroud)

但是当我尝试对其进行分析时cProfile,我会得到以下结果:

$ python3.6 -m cProfile -o forkiter.prof ./forkiter.py
0
1
2
3
4
Traceback (most recent call last):
  File "/home/bli/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/bli/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/bli/lib/python3.6/cProfile.py", line 160, in <module>
    main()
  File "/home/bli/lib/python3.6/cProfile.py", line 153, in main
    runctx(code, globs, None, options.outfile, options.sort)
  File "/home/bli/lib/python3.6/cProfile.py", line 20, in runctx
    filename, sort)
  File "/home/bli/lib/python3.6/profile.py", line 64, in runctx
    prof.runctx(statement, globals, locals)
  File "/home/bli/lib/python3.6/cProfile.py", line 100, in runctx
    exec(cmd, globals, locals)
  File "./forkiter.py", line 71, in <module>
    exit(main())
  File "./forkiter.py", line 67, in main
    sum_tuples, results_generator))
  File "/home/bli/lib/python3.6/multiprocessing/pool.py", line 699, in next
    raise value
  File "/home/bli/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
    put(task)
  File "/home/bli/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/bli/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
Run Code Online (Sandbox Code Playgroud)

怎么了?

这是脚本:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
from time import sleep

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
#     def tuple_func(args_list):
#         return tuple(func(args) for func, args in zip(funcs, args_list))
#     return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
    """This kind of object can be used to group functions and call them on a
    tuple of arguments."""
    __slots__ = ("funcs", )

    def __init__(self, funcs):
        self.funcs = funcs

    def __len__(self):
        return len(self.funcs)

    def __call__(self, args_list):
        return tuple(func(args) for func, args in zip(self.funcs, args_list))

    def fork_args(self, args_list):
        """Takes an arguments list and repeat them in a n-tuple."""
        return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
    """Element-wise sum of tuple items."""
    return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
    return x + 1

def double(x):
    return 2 * x

def square(x):
    return x * x

def main():
    def my_generator():
        for i in range(5):
            print(i)
            yield i


    test_tuple_func = FuncApplier((plus_one, double, square))

    with Pool(processes=5) as pool:
        results_generator = pool.imap_unordered(
            test_tuple_func,
            (test_tuple_func.fork_args(args_list) for args_list in my_generator()))
        print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
            sum_tuples, results_generator))
    exit(0)

if __name__ == "__main__":
    exit(main())
Run Code Online (Sandbox Code Playgroud)

一些酸洗测试

一些研究建议我,有时对象需要__setstate____getstate__方法是picklable.这有助于一些酸洗协议,但这似乎并没有解决cProfile案件中的问题.请参阅下面的测试.

更新的脚本:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
from time import sleep
import pickle

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
#     def tuple_func(args_list):
#         return tuple(func(args) for func, args in zip(funcs, args_list))
#     return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
    """This kind of object can be used to group functions and call them on a
    tuple of arguments."""
    __slots__ = ("funcs", )

    def __init__(self, funcs):
        self.funcs = funcs

    def __len__(self):
        return len(self.funcs)

    def __call__(self, args_list):
        return tuple(func(args) for func, args in zip(self.funcs, args_list))

    # Attempt to make it pickleable when under cProfile (doesn't help)
    def __getstate__(self):
        return self.funcs

    def __setstate__(self, state):
        self.funcs = state

    def fork_args(self, args_list):
        """Takes an arguments list and repeat them in a n-tuple."""
        return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
    """Element-wise sum of tuple items."""
    return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
    return x + 1

def double(x):
    return 2 * x

def square(x):
    return x * x

def main():
    def my_generator():
        for i in range(5):
            print(i)
            yield i


    test_tuple_func = FuncApplier((plus_one, double, square))

    print("protocol 0")
    try:
        print(pickle.dumps(test_tuple_func, 0))
    except pickle.PicklingError as err:
        print("failed with the following error:\n%s" % err)
    print("protocol 1")
    try:
        print(pickle.dumps(test_tuple_func, 0))
    except pickle.PicklingError as err:
        print("failed with the following error:\n%s" % err)
    print("protocol 2")
    try:
        print(pickle.dumps(test_tuple_func, 0))
    except pickle.PicklingError as err:
        print("failed with the following error:\n%s" % err)
    print("protocol 3")
    try:
        print(pickle.dumps(test_tuple_func, 0))
    except pickle.PicklingError as err:
        print("failed with the following error:\n%s" % err)
    print("protocol 4")
    try:
        print(pickle.dumps(test_tuple_func, 0))
    except pickle.PicklingError as err:
        print("failed with the following error:\n%s" % err)

    with Pool(processes=5) as pool:
        results_generator = pool.imap_unordered(
            test_tuple_func,
            (test_tuple_func.fork_args(args_list) for args_list in my_generator()))
        print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
            sum_tuples, results_generator))
    exit(0)

if __name__ == "__main__":
    exit(main())
Run Code Online (Sandbox Code Playgroud)

测试没有cProfile好看:

$ ./forkiter.py
protocol 0
b'ccopy_reg\n_reconstructor\np0\n(c__main__\nFuncApplier\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(c__main__\nplus_one\np5\nc__main__\ndouble\np6\nc__main__\nsquare\np7\ntp8\nb.'
protocol 1
b'ccopy_reg\n_reconstructor\np0\n(c__main__\nFuncApplier\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(c__main__\nplus_one\np5\nc__main__\ndouble\np6\nc__main__\nsquare\np7\ntp8\nb.'
protocol 2
b'ccopy_reg\n_reconstructor\np0\n(c__main__\nFuncApplier\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(c__main__\nplus_one\np5\nc__main__\ndouble\np6\nc__main__\nsquare\np7\ntp8\nb.'
protocol 3
b'ccopy_reg\n_reconstructor\np0\n(c__main__\nFuncApplier\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(c__main__\nplus_one\np5\nc__main__\ndouble\np6\nc__main__\nsquare\np7\ntp8\nb.'
protocol 4
b'ccopy_reg\n_reconstructor\np0\n(c__main__\nFuncApplier\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(c__main__\nplus_one\np5\nc__main__\ndouble\np6\nc__main__\nsquare\np7\ntp8\nb.'
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30
Run Code Online (Sandbox Code Playgroud)

cProfile下的测试在每个酸洗协议中失败(因此也在多处理中):

$ python3.6 -m cProfile -o forkiter.prof ./forkiter.py
protocol 0
failed with the following error:
Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
protocol 1
failed with the following error:
Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
protocol 2
failed with the following error:
Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
protocol 3
failed with the following error:
Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
protocol 4
failed with the following error:
Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
0
1
2
3
4
Traceback (most recent call last):
  File "/home/bli/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/bli/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/bli/lib/python3.6/cProfile.py", line 160, in <module>
    main()
  File "/home/bli/lib/python3.6/cProfile.py", line 153, in main
    runctx(code, globs, None, options.outfile, options.sort)
  File "/home/bli/lib/python3.6/cProfile.py", line 20, in runctx
    filename, sort)
  File "/home/bli/lib/python3.6/profile.py", line 64, in runctx
    prof.runctx(statement, globals, locals)
  File "/home/bli/lib/python3.6/cProfile.py", line 100, in runctx
    exec(cmd, globals, locals)
  File "./forkiter.py", line 105, in <module>
    exit(main())
  File "./forkiter.py", line 101, in main
    sum_tuples, results_generator))
  File "/home/bli/lib/python3.6/multiprocessing/pool.py", line 699, in next
    raise value
  File "/home/bli/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
    put(task)
  File "/home/bli/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/bli/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.FuncApplier'>: attribute lookup FuncApplier on __main__ failed
Run Code Online (Sandbox Code Playgroud)

小智 5

它似乎cProfile根本不适用于多处理.

如果您乐意修改代码,只对主进程进行概要分析(或为子进程添加特定的概要分析),那么cProfile.run()似乎在某种程度上有效.

在您的示例中,替换

exit(main())
Run Code Online (Sandbox Code Playgroud)

exit(cProfile.run('main()')
Run Code Online (Sandbox Code Playgroud)

如果并行函数是全局范围函数,那至少可以工作,不确定对于像你的情况那样的类也是如此.