以低优先级Popen启动进程

Coo*_*pix 8 python psutil python-multiprocessing

我正在寻找一种在Windows中以低优先级有效启动多个进程的方法.我试过了 :

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process(p.pid).nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit
Run Code Online (Sandbox Code Playgroud)

问题是:低优先级不会立即设置.我开始时会冻结一些.当我仔细观察过程窗口时,我可以看到应用程序的优先级从high_priority开始并切换到low_priority.

我想立即以低优先级启动或找到另一种方法来阻止CPU使用率(现在100%).

然后我在多处理池中使用run命令(每次运行几秒钟).

def safe_run(args):
    """Call run(), catch exceptions."""
    try: 
        run(args)
    except Exception as e:
        print(args[0])
        print(e)


def parallel(commands,nb_proc):
    # populate files
    # start processes
    if len(commands) < 10:
        nb_proc = 1
    print('Use of {} cpus\n'.format(nb_proc))
    pool = mp.Pool(nb_proc)
    pool.map(safe_run, commands, chunksize=1) 
Run Code Online (Sandbox Code Playgroud)

UPDATE

Test.exe是一个fortran代码:

    integer function NumArguments()
        integer :: IARGC
        NumArguments = IARGC()
    end function

    subroutine GetArgument(Idx,Argument)
      integer, intent(in) :: Idx
      character(LEN=*), intent(out) :: Argument
      call GETARG(Idx,Argument)
   end subroutine

    program Console
    implicit none
    integer, parameter :: INTEG = SELECTED_INT_KIND(9)
    integer(INTEG), parameter :: MAX_STRING_LEN = 1024_INTEG
    character(LEN=MAX_STRING_LEN) :: FileName
    integer(INTEG) :: i


    call GetArgument(1,FileName)

    ! Body of Console
    !print *, 'Hello World'
    !print *, FileName

    call sleep(5)
    open(unit=1, file=FileName,status='new')
     Do i=1,1000,1
         write(1,*) i
     Enddo
     close(unit=1)

    end program Console
Run Code Online (Sandbox Code Playgroud)

完整代码:

# -*- coding: utf-8 -*-
"""

"""

###############################################################################
###############################################################################
#
#                     IMPORT & INIT                 
# 
###############################################################################
###############################################################################
import psutil
import subprocess
import time
import multiprocessing.dummy as mp
import os

TEST_EXE  = "Console.exe"
nb_proc      =   4



###############################################################################
###############################################################################
#
#                     FUNCTION                 
# 
###############################################################################
###############################################################################

def run(command):
    try :
        print(command[0])
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p.wait()
    except:
        print('Point {} fail'.format(point))
        raise SystemExit

def safe_run(args):
    """Call run(), catch exceptions."""
    try: 
        run(args)
    except Exception as e:
        print('{} error'.format(args[0]))


def parallel(commands,nb_proc):
    print('Use of {} cpus\n'.format(nb_proc))
    pool = mp.Pool(nb_proc) 
    pool.map(safe_run, commands, chunksize=1)


###############################################################################
###############################################################################
#
#                     MAIN SCRIPT                 
# 
###############################################################################
###############################################################################

current_dir = os.path.abspath('')
print('\nCurrent directory {}'.format(current_dir))  
t1 = time.time()

logfiles = list()        
commands = list()
logfiles_obj = list()
for step in range(100):
    logfile = open(os.path.join(current_dir,'logfile_'+ str(step) + '.out'), 'w')
    args = TEST_EXE + ' ' + os.path.join(current_dir,'output_'+str(step) + '.txt')
    temp = (args,logfile)
    commands.append(temp)

# run in parallel
print("Calculation running ...\n")
parallel(commands,nb_proc)


for log in logfiles_obj:
    log.close()

# time for running all the point and complete
t2 = time.time()
print ("\n ########## Overall time : %5.2f secondes ##########" %(t2 - t1))
print("\n ##########       Correct ending       ##########")
Run Code Online (Sandbox Code Playgroud)

Ser*_*sta 6

Posix系统的常规方法是在启动命令之前使用preexec_fn参数subprocess.Popen来调用函数(在本答案中详细说明).不幸的是,这是预期之间发生forkexec系统调用和Windows不创建过程的方式.

在Windows上,用于创建子进程的基础(WinAPI)系统调用是CreateProcess.MSDN上的页面说:

BOOL WINAPI CreateProcess(
  ...
  _In_        DWORD                 dwCreationFlags,
  ...
);
Run Code Online (Sandbox Code Playgroud)


dwCreationFlags [in]
控制优先级类和进程创建的标志...此参数还控制新进程的优先级类,该优先级用于确定进程线程的调度优先级.

不幸的是,Python接口没有设置子优先级的规定,因为它明确地说:

creationflags,如果给定,可以是CREATE_NEW_CONSOLE或REATE_NEW_PROCESS_GROUP.(仅限Windows)

但是dwCreationFlagsMSDN上的文档也说:

...如果未指定任何优先级类别标志,则优先级类默认为NORMAL_PRIORITY_CLASS,除非创建进程的优先级类别为IDLE_PRIORITY_CLASS或BELOW_NORMAL_PRIORITY_CLASS.在这种情况下,子进程接收调用进程的默认优先级.

这意味着可以简单地继承优先级,从Windows控制子优先级的方法是在启动子进程之前设置优先级,并在以下情况后立即重置:

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])    # start child at low priority
        psutil.Process().nice(psutil.NORMAL_PRIORITY_CLASS)  # reset current priority
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit
Run Code Online (Sandbox Code Playgroud)

这个答案的剩余部分将与像Linux或Unix这样的Posix系统相关.

preexec_fn参数Popen是你所需要的.它允许在创建子进程和执行命令之间调用可调用对象(例如函数).你可以这样做:

def set_low_pri():
    psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
Run Code Online (Sandbox Code Playgroud)

然后用它来启动一个低优先级的孩子:

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        p = subprocess.Popen(command[0] , stdout = command[1], preexec_fn=set_low_pri)
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit
Run Code Online (Sandbox Code Playgroud)

这样,Python可确保执行命令之前设置低优先级.


参考:子进程模块的文档说明:

17.5.1.2.Popen构造函数
......

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None,
      stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None,
      universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True,
      start_new_session=False, pass_fds=(), *, encoding=None, errors=None) 
Run Code Online (Sandbox Code Playgroud)

...
如果将preexec_fn设置为可调用对象,则在子进程执行之前,将在子进程中调用此对象.(仅限POSIX)


但上面的方法不是线程安全的!如果两个线程同时运行,我们可能会遇到以下竞争条件:

  • 线程A降低优先级
  • 线程A启动它(低优先级)
  • 线程B降低优先级(无操作)
  • 线程A重置普通优先级
  • 线程B 以正常优先级启动其子进程
  • 线程B重置正常优先级(无操作)

问题是这multiprocessing.dummy是一个包装threading.标准Python库文档(3.6)在17.2.2.13中说.multiprocessing.dummy模块

multiprocessing.dummy复制多处理的API,但只不过是线程模块的包装器.

一旦发现问题,修复就很简单:只需使用a Lock来保护关键部分:

lock = mp.Lock()

def run(command):
    try :
        print(command[0])
        lock.acquire()
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process().nice(psutil.NORMAL_PRIORITY_CLASS) # normal priority
        lock.release()
        p.wait()
    except:
        print('Point {} fail'.format(point))
        raise SystemExit
Run Code Online (Sandbox Code Playgroud)


Jam*_*ent 5

没有人建议它,但仅仅因为子处理模块没有公开所需的常量,并不意味着我们不能将它们传递给模块来设置优先级:

import subprocess

ABOVE_NORMAL_PRIORITY_CLASS = 0x00008000
BELOW_NORMAL_PRIORITY_CLASS = 0x00004000
HIGH_PRIORITY_CLASS         = 0x00000080
IDLE_PRIORITY_CLASS         = 0x00000040
NORMAL_PRIORITY_CLASS       = 0x00000020
REALTIME_PRIORITY_CLASS     = 0x00000100

p = subprocess.Popen(["notepad.exe"], creationflags=BELOW_NORMAL_PRIORITY_CLASS)

p.wait()
Run Code Online (Sandbox Code Playgroud)

这将正确设置创建标志并以设置优先级启动进程,以正确显示它_winapi和子处理模块都需要修补(使常量成为模块的一部分而不是草图)