多处理超时?

use*_*216 2 python multithreading python-3.x python-multiprocessing

此代码 ping 各种机器。您能否帮我更改此代码,以便如果 ping 进程挂起超过 7 秒,它会关闭并返回一些标志?

(我想从使用 WMI 的机器中提取各种数据。为此,我将 ping 功能更改为其他功能。问题是在某些机器上 WMI 已损坏并且提取数据的过程无限期挂起。需要超时。)

import multiprocessing.dummy
import subprocess
import numpy as np
import time

start_time = time.time()

def ping(ipadd):
    try:
        response = subprocess.check_output(['ping', ipadd])
        return True
    except subprocess.CalledProcessError as e:
        return False
#print(ping('10.25.59.20'))
machine_names = \
'''
ya.ru
microsoft.com
www.google.com
www.amazon.com
www.nasa.com
'''.split()

np_machine_names = np.array(machine_names)
p = multiprocessing.dummy.Pool(7)
ping_status = p.map(ping, machine_names)
np_ping_status = np.fromiter(ping_status, dtype=bool)
print(*np_machine_names[np_ping_status], sep = '\n')


run_time = time.time() - start_time
print(f'Runtime: {run_time:.0f}')
Run Code Online (Sandbox Code Playgroud)

更新:虽然我很欣赏关于添加超时到子进程的提示,但问题仍然存在。如何关闭挂起的功能?假设我已将 ping 更改为从机器中提取 WMI 数据(这个从 Windows 机器中提取已安装软件的列表)。没有子进程可以设置计时器:

#pip install pypiwin32
import win32com.client 
strComputer = "." 
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator") 
objSWbemServices = objWMIService.ConnectServer(strComputer,"root\cimv2") 
colItems = objSWbemServices.ExecQuery("Select * from Win32_Product") 
for objItem in colItems: 
    print( "Caption: ", objItem.Caption )
Run Code Online (Sandbox Code Playgroud)

nox*_*fox 5

有几种方法可以解决长时间运行的执行。每种方式都有其优点和缺点。

蜜蜂

如前所述,最简单的方法是依赖 API 超时。模块,如subprocesssocketrequests等...它们的API中暴露超时参数。

只要可行,这是最好的方法。

线程

长时间运行/挂起的逻辑在单独的线程中执行。主循环可以不受干扰地继续并忽略挂起的执行。

import threading

TIMEOUT = 60

def hanging_function():
    hang_here()

thread = threading.Tread(target=hanging_function)
thread.daemon = True
thread.start()

thread.join(TIMEOUT)
if thread.is_alive():
    print("Function is hanging!")
Run Code Online (Sandbox Code Playgroud)

这种方法的问题之一是挂起的线程将继续在后台执行,消耗资源。

另一个限制是由于线程共享内存这一事实。如果您的函数碰巧严重崩溃,它也可能影响您的主要执行。

流程

我最喜欢的方法是使用multiprocessing设施在单独的进程中执行有问题的逻辑。由于进程不共享内存,有问题的函数中发生的任何事情都仅限于您可以在任何时间终止的子进程。

import multiprocessing

TIMEOUT = 60

def hanging_function():
    hang_here()

process = multiprocessing.Process(target=hanging_function)
process.daemon = True
process.start()

process.join(TIMEOUT)
if process.is_alive():
    print("Function is hanging!")
    process.terminate()
    print("Kidding, just terminated!")
Run Code Online (Sandbox Code Playgroud)

石子库是建立在这一原则之上。允许轻松分离有问题的代码并处理故障和灾难。

使用过程的缺点是它们比其他两种方法要重一些。此外,由于进程之间的内存是隔离的,因此共享数据会更复杂一些。