多处理 - > pathos.multiprocessing和windows

jdm*_*cbr 6 python pickle multiprocessing dill pathos

我目前正在使用python中的标准多处理来生成一系列无限期运行的进程.我并不特别关心表现; 每个线程只是在观察文件系统上的不同更改,并在修改文件时采取适当的操作.

目前,我有一个适合我的需求的解决方案,适用于Linux.我有一个函数和参数的字典,如下所示:

 job_dict['func1'] = {'target': func1, 'args': (args,)}
Run Code Online (Sandbox Code Playgroud)

对于每一个,我创建一个过程:

 import multiprocessing
 for k in job_dict.keys():
     jobs[k] = multiprocessing.Process(target=job_dict[k]['target'],
                                       args=job_dict[k]['args'])
Run Code Online (Sandbox Code Playgroud)

有了这个,我可以跟踪每个正在运行的,并在必要时重新启动因任何原因崩溃的作业.

这在Windows中不起作用.我正在使用的许多函数都是包装器,使用各种functools函数,我得到的消息是无法序列化函数(请参阅多处理和dill可以一起做什么?).我还没弄清楚为什么我在Linux中没有得到这个错误,但在Windows中也是如此.

如果我dill在Windows中启动进程之前导入,则不会出现序列化错误.但是,这些过程实际上并没有做任何事情.我无法弄清楚为什么.

然后我切换到多处理实现pathos,但没有找到Process标准multiprocessing模块中的简单类的模拟.我能够使用每个作业生成线程pathos.pools.ThreadPool.这不是map的预期用途,我敢肯定,但它启动了所有线程,并且它们在Windows中运行:

import pathos
tp = pathos.pools.ThreadPool()
for k in job_dict.keys():
    tp.uimap(job_dict[k]['target'], job_dict[k]['args'])
Run Code Online (Sandbox Code Playgroud)

但是,现在我不确定如何监视一个线程是否仍处于活动状态,我正在寻找这样,以便我可以重新启动由于某种原因崩溃的线程.有什么建议?

Mik*_*rns 6

我是pathosdill作者.这个Process类深埋pathospathos.helpers.mp.process.Process其中,mp它本身就是multiprocessing图书馆的实际分支.multiprocessing应该可以从那里访问所有内容.

另一件要知道的事情pathos是,它会让pool你保持活力,直到你从持有状态中移除它为止.这有助于减少创建"新"池的开销.要删除池,您可以:

>>> # create
>>> p = pathos.pools.ProcessPool()
>>> # remove
>>> p.clear()
Run Code Online (Sandbox Code Playgroud)

Process然而,没有这样的机制.

因为multiprocessing,Windows不同于Linux和Macintosh ...因为Windows fork在Linux 上没有正确的类似... linux可以跨进程共享对象,而在Windows上没有共享...它基本上是一个完全独立的新进程...因此序列化必须更好地将对象传递给另一个进程 - 就像将对象发送到另一台计算机一样.在Linux上,你必须这样做才能获得相同的行为:

def check(obj, *args, **kwds):
    """check pickling of an object across another process"""
    import subprocess
    fail = True
    try:
        _x = dill.dumps(x, *args, **kwds)
        fail = False
    finally:
        if fail:
            print "DUMP FAILED"
    msg = "python -c import dill; print dill.loads(%s)" % repr(_x)
    print "SUCCESS" if not subprocess.call(msg.split(None,2)) else "LOAD FAILED"
Run Code Online (Sandbox Code Playgroud)