Phi*_*Bot 1 python linux flask python-3.x asyncio
我在Ubuntu 18.04上将Flask 1.0.2与Python 3.6一起使用。我的应用程序应使用asyncio并asyncio.create_subprocess_exec()启动后台脚本,从中读取stdout,然后在脚本完成后返回状态。
我基本上是想从这篇文章中实现一个答案: 在Python的subprocess.PIPE上非阻塞读取
该脚本已成功启动,并且从中获得了所有预期的输出,但是问题是它从不返回(意味着Killing subprocess now从不到达该行)。当我ps从Linux终端检查进程列表()时,后台脚本已退出。
我在做什么错,如何才能成功摆脱困境async for line in process.stdout?
导入后,在文件顶部,创建事件循环:
# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)
Run Code Online (Sandbox Code Playgroud)
我在路线上方定义了异步协程:
async def readAsyncFunctionAndKill(cmd):
# Use global event loop
global eventLoop
print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
async for line in process.stdout:
line = line.decode(locale.getpreferredencoding(False))
print("%s"%line, flush=True)
print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
process.kill()
print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
return await process.wait() # wait for the child process to exit
Run Code Online (Sandbox Code Playgroud)
我的(缩写)路线在这里:
@app.route("/train_model", methods=["GET"])
def train_new_model():
# Use global event loop
global eventLoop
with closing(eventLoop):
eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))
return jsonify("done"), 200
Run Code Online (Sandbox Code Playgroud)
名为“ s.py”的脚本被标记为可执行文件,并且位于同一工作目录中。此处显示了缩写脚本(它包含几个子进程并实例化PyTorch类):
def main():
# Ensure that swap is activated since we don't have enough RAM to train our model otherwise
print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
subprocess.call("swapon -a", shell=True)
# Need to initialize GPU
print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
defaults.device = torch.device("cuda")
with torch.cuda.device(0):
torch.tensor([1.]).cuda()
print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))
try:
print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))
# Batch size
bs = 16
#bs = 8
# Create ImageBunch
tfms = get_transforms(do_flip=True,
flip_vert=True,
max_rotate=180.,
max_zoom=1.1,
max_lighting=0.5,
max_warp=0.1,
p_affine=0.75,
p_lighting=0.75)
# Create databunch using folder names as class names
# This also applies the transforms and batch size to the data
os.chdir(TRAINING_DIR)
data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)
...
# Create a new learner with an early stop callback
learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])
...
print("[%s] All done training ..." % (os.path.basename(__file__)))
# Success
sys.exit(0)
except Exception as err:
print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
sys.exit(255)
if __name__== "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
这里有几个问题:
您一次要在import上创建一个新的事件循环,但在视图中关闭该事件循环。根本不需要关闭循环,因为由于关闭了循环,第二个请求现在将失败。
asyncio事件循环不是线程安全的,因此不应在线程之间共享。绝大多数Flask部署将使用线程来处理传入的请求。您的代码带有应如何处理的回声,但不幸的是,这不是正确的方法。例如asyncio.get_child_watcher().attach_loop(eventLoop),大多数都是多余的,因为eventLoop = asyncio.new_event_loop()如果在主线程上运行,已经可以做到这一点。
这是您遇到的问题的主要候选人。
您的代码假定可执行文件实际上存在并且可执行。您应该处理OSError异常(和子类),因为不合格的对象s.py只有在使其成为可执行文件,以#!shebang行开头并在上找到后,才会起作用PATH。仅仅因为它在同一个目录中就行不通,或者您也不想依赖当前的工作目录。
您的代码假定该进程在某个时候关闭了stdout。如果子进程从不关闭stdout(进程退出时会自动发生),那么async for line in process.stdout:循环也将永远等待。考虑将超时添加到代码中,以避免在错误的子进程中被阻塞。
在多线程应用程序中使用asyncio子流程时,您确实希望阅读Python asyncio文档中的两部分:
在并发性与多线程部分解释说,几乎所有的ASYNCIO对象不是线程安全的。您不想直接从其他线程向循环添加任务;您要对每个线程使用事件循环,或者要使用该asyncio.run_coroutine_threadsafe()函数在特定线程的循环上运行协程。
对于3.7版以下的Python版本,您还需要阅读Subprocess and Threads部分,因为直到该版本为止,它都asyncio使用非阻塞os.waitpid(-1, os.WNOHANG)调用来跟踪子状态并依赖于使用信号处理(只能在主线程上完成)。 。Python 3.8消除了此限制(通过添加一个新的子监视程序实现,该实现os.waitpid()在单独的线程中使用阻塞的按进程调用,以牺牲额外的内存为代价。
你不具备坚持到默认的孩子守望的策略,但是。您可以使用EventLoopPolicy.set_child_watcher()和传入其他流程观察者实例。实际上,这意味着将3.8 ThreadedChildWatcher实现反向移植。
对于您的用例,确实不需要每个线程运行一个新的事件循环。根据需要在单独的线程中运行一个循环。如果你在一个单独的线程中使用一个循环,这取决于你的Python版本,你可能需要有在主线程上运行的循环,以及或用不同的进程守望。一般来说,在WSGI服务器的主线程上运行asyncio循环并不容易,甚至不可能。
因此,您需要在单独的线程中永久运行循环,并且需要使用在没有主线程循环的情况下工作的子进程监视程序。这是一个实现的实现,它应该适用于Python 3.6及更高版本:
import asyncio
import itertools
import logging
import time
import threading
try:
# Python 3.8 or newer has a suitable process watcher
asyncio.ThreadedChildWatcher
except AttributeError:
# backport the Python 3.8 threaded child watcher
import os
import warnings
# Python 3.7 preferred API
_get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)
class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
def __init__(self):
self._pid_counter = itertools.count(0)
self._threads = {}
def is_active(self):
return True
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __del__(self, _warn=warnings.warn):
threads = [t for t in list(self._threads.values()) if t.is_alive()]
if threads:
_warn(
f"{self.__class__} has registered but not finished child processes",
ResourceWarning,
source=self,
)
def add_child_handler(self, pid, callback, *args):
loop = _get_running_loop()
thread = threading.Thread(
target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True,
)
self._threads[pid] = thread
thread.start()
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base class requires it
return True
def attach_loop(self, loop):
pass
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255", pid
)
else:
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
elif os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
else:
returncode = status
if loop.get_debug():
logger.debug(
"process %s exited with returncode %s", expected_pid, returncode
)
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
self._threads.pop(expected_pid)
# add the watcher to the loop policy
asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
logger = logging.getLogger(__name__)
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
Run Code Online (Sandbox Code Playgroud)
上面是与我通过Flask route为Python3 Asyncio调用发布的通用“与Flask运行异步”解决方案相同的方法,但是增加了反向端口ThreadedChildWatcher。
然后,您可以get_event_loop()通过调用从返回的循环来运行子进程run_coroutine_threadsafe():
import asyncio
import locale
import logging
logger = logging.getLogger(__name__)
def get_command_output(cmd, timeout=None):
encoding = locale.getpreferredencoding(False)
async def run_async():
try:
process = await asyncio.create_subprocess_exec(
cmd, stdout=asyncio.subprocess.PIPE)
except OSError:
logging.exception("Process %s could not be started", cmd)
return
async for line in process.stdout:
line = line.decode(encoding)
# TODO: actually do something with the data.
print(line, flush=True)
process.kill()
logging.debug("Process for %s exiting with %i", cmd, process.returncode)
return await process.wait()
future = run_coroutine(run_async())
result = None
try:
result = future.result(timeout)
except asyncio.TimeoutError:
logger.warn('The child process took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(f'The child process raised an exception')
return result
Run Code Online (Sandbox Code Playgroud)
请注意,上面的函数可能会花费超时(以秒为单位),这是您等待子流程完成的最长时间。
| 归档时间: |
|
| 查看次数: |
168 次 |
| 最近记录: |