我有一个长期运行的,守护进程的Python进程,它使用子进程在发生某些事件时生成新的子进程.长时间运行的进程由具有超级用户权限的用户启动.我需要它生成的子进程作为不同的用户(例如,"nobody")运行,同时保留父进程的超级用户权限.
我正在使用
su -m nobody -c <program to execute as a child>
Run Code Online (Sandbox Code Playgroud)
但这似乎是重量级的,并不会非常干净地死去.
有没有办法以编程方式完成此操作而不是使用su?我正在查看os.set*uid方法,但Python std lib中的doc在该领域非常稀少.
我cat | zgrep
在远程服务器上运行几个命令并单独收集它们的输出以进行进一步处理:
class MainProcessor(mp.Process):
def __init__(self, peaks_array):
super(MainProcessor, self).__init__()
self.peaks_array = peaks_array
def run(self):
for peak_arr in self.peaks_array:
peak_processor = PeakProcessor(peak_arr)
peak_processor.start()
class PeakProcessor(mp.Process):
def __init__(self, peak_arr):
super(PeakProcessor, self).__init__()
self.peak_arr = peak_arr
def run(self):
command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
log_lines = (subprocess.check_output(command, shell=True)).split('\n')
process_data(log_lines)
Run Code Online (Sandbox Code Playgroud)
但是,这会导致子进程('ssh ... cat ...')命令的顺序执行.第二个峰值等待第一个完成,依此类推.
如何修改此代码以便子进程调用并行运行,同时仍能够单独收集每个的输出?
我理解使用subprocess是调用外部命令的首选方式.
但是如果我想在parall中运行几个命令,但是限制生成的进程数呢?困扰我的是我无法阻止子进程.例如,如果我打电话
subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)
Run Code Online (Sandbox Code Playgroud)
然后该过程将继续,无需等待cmd
完成.因此,我无法将其包装在multiprocessing
图书馆的工作人员中.
例如,如果我这样做:
def worker(cmd):
subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];
Run Code Online (Sandbox Code Playgroud)
然后每个工人将在产生子流程后完成并返回.所以我无法真正限制subprocess
使用生成的进程数Pool
.
什么是限制子过程数量的正确方法?
我有一个GUI应用程序,需要从GUI主循环旁边的网络中获取和解析各种资源.我使用python多处理模块搜索了选项,因为这些获取操作不仅包含阻塞IO而且还包含大量解析,因此多处理可能比python线程更好.使用Twisted会很容易,但这次Twisted不是一个选项.
我找到了一个简单的解决方案:
问题是在MainThread中没有调用回调.
所以我想出了以下解决方案:
delegate.py
import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging
_CALLBACKS = {}
_QUEUE = mp.Queue()
info = logging.getLogger(__name__).info
class Call(namedtuple('Call', 'id finished result error')):
def attach(self, func):
if not self.finished:
_CALLBACKS.setdefault(self.id, []).append(func)
else:
func(self.result or self.error)
return self
def callback(self):
assert self.finished, 'Call not finished yet'
r = self.result or self.error
for func in _CALLBACKS.pop(self.id, []):
func(r)
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished' …
Run Code Online (Sandbox Code Playgroud)