在多线程Python程序中,一个线程有时会使用内置的raw_input()来请求控制台输入.我希望能够在raw_input提示符下通过在shell中键入^ C来关闭程序(即,使用SIGINT信号).但是,当子线程正在执行raw_input时,键入^ C什么也不做 - 在我点击返回(离开raw_input)之前不会引发KeyboardInterrupt.
例如,在以下程序中:
import threading
class T(threading.Thread):
def run(self):
x = raw_input()
print x
if __name__ == '__main__':
t = T()
t.start()
t.join()
Run Code Online (Sandbox Code Playgroud)
键入^ C在输入完成之前不会执行任何操作.但是,如果我们只是调用T().run()(即,单线程情况:只在主线程中运行raw_input),^ C立即关闭程序.
据推测,这是因为SIGINT被发送到主线程,它被挂起(等待GIL),而分支线程在控制台上阻塞读取.主线程在raw_input返回后抓取GIL之前不会执行其信号处理程序.(如果我错了,请纠正我 - 我不是Python线程实现方面的专家.)
有没有办法以类似raw_input的方式从stdin读取,同时允许主线程处理SIGINT,从而降低整个过程?
[我在Mac OS X和一些不同的Linux上观察到了上述行为.]
编辑:我错误地描述了上面的根本问题.在进一步调查中,主要线程的呼吁join()是阻止信号处理:Guido van Rossum本人已经解释说,加入中的底层锁定是不可中断的.这意味着信号实际上被推迟到整个线程完成 - 所以这实际上完全没有任何关系raw_input(只是后台线程阻塞以便连接没有完成的事实).
我正在使用Python的Watchdog监视给定目录中正在创建的新文件.创建文件时,会运行一些代码生成子进程shell命令以运行不同的代码来处理此文件.这应该为每个创建的新文件运行.我已经在创建一个文件时对此进行了测试,并且工作正常,但是在创建多个文件时,无论是同时还是一个接一个地创建它都很困难.
我当前的问题是这个...在shell中运行的处理代码需要一段时间才能运行,并且在目录中创建新文件之前不会完成.我无能为力.在此代码运行时,看门狗将无法识别已创建新文件,并且不会继续执行该代码.
所以我认为我需要为每个新文件生成一个新进程,或者做一些事情来同时运行,而不是等到一个文件完成后再处理下一个文件.
所以我的问题是:
1.)实际上,我将在一个目录中同时创建4个不同系列的文件.看门狗一次运行所有4个文件的文件创建代码的最佳方法是什么?
2.)当代码针对一个文件运行时,如何让监视程序开始处理同一系列中的下一个文件,而不必等到前一个文件的处理完成.这是必要的,因为文件是特定的,我需要暂停一个文件的处理,直到另一个文件完成,但它们的创建顺序可能会有所不同.
我是否需要以某种方式将看门狗与多处理或线程结合起来?或者我需要实现多个观察者?我有点不知所措.谢谢你的帮助.
class MonitorFiles(FileSystemEventHandler):
'''Sub-class of watchdog event handler'''
def __init__(self, config=None, log=None):
self.log = log
self.config = config
def on_created(self, event):
file = os.path.basename(event.src_path)
self.log.info('Created file {0}'.format(event.src_path))
dosWatch.go(event.src_path, self.config, self.log)
def on_modified(self, event):
file = os.path.basename(event.src_path)
ext = os.path.splitext(file)[1]
if ext == '.fits':
self.log.warning('Modifying a FITS file is not allowed')
return
def on_deleted(self, event):
self.log.critical('Nothing should ever be deleted from here!')
return
Run Code Online (Sandbox Code Playgroud)
def monitor(config, log):
'''Uses the Watchdog package to monitor …Run Code Online (Sandbox Code Playgroud) 假设我有一个threading.Lock()我想要获取的对象以便使用资源.假设我想try ... except ...在资源中使用一个子句.
有几种方法可以做到这一点.
方法1
import threading
lock = threading.Lock()
try:
with lock:
do_stuff1()
do_stuff2()
except:
do_other_stuff()
Run Code Online (Sandbox Code Playgroud)
如果在do_stuff1()或期间发生错误do_stuff2(),是否会释放锁定?或者使用以下方法之一是否更好?
方法2
with lock:
try:
do_stuff1()
do_stuff2()
except:
do_other_stuff()
Run Code Online (Sandbox Code Playgroud)
方法3
lock.acquire():
try:
do_stuff1()
do_stuff2()
except:
do_other_stuff()
finally:
lock.release()
Run Code Online (Sandbox Code Playgroud)
即使发生错误,哪种方法最适合释放锁?
我正在设计一个Python应用程序,它应该访问一台机器来执行一些(冗长的)任务.对于与网络相关的所有内容,asyncio模块似乎是一个不错的选择,但现在我需要访问一个特定组件的串行端口.我已经为实际的串口实现了一种抽象层,但是无法弄清楚如何将它与asyncio明智地集成在一起.
下面的设置:我有一个运行循环的线程,它经常与机器对话并解码响应.使用一种方法enqueue_query(),我可以将一个查询字符串放入一个队列,然后由另一个线程将其发送到机器并引发响应.通过传入threading.Event(或带有set()方法的任何东西),调用者可以执行阻塞等待响应.这可能看起来像这样:
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())
Run Code Online (Sandbox Code Playgroud)
我的目标是将这些行放入协程并让asyncio处理这个等待,以便应用程序可以在此期间执行其他操作.我怎么能这样做?它可能会通过将其包装f.wait()到Executor中来实现,但这似乎有点愚蠢,因为这会创建一个新线程,只是等待另一个线程做某事.
谢谢!最好的问候,菲利普
我想要一个上下文管理器,我可以在其中放置一些要在单独的线程中执行的代码。
到目前为止,我找不到一种方法来实现我想要的,最好的选择是编写闭包并在单独的线程中执行闭包。
我想要这样的东西
# code runs on main thread
print("this is main thread")
with amazingcontextmanager:
# code to run in separate thread
print("this is not main thread")
Run Code Online (Sandbox Code Playgroud)
编辑:让我尝试再次问我的问题
@contextlib.contextmanager
def amazingcontextmanager():
try:
yield
finally:
print("thread done")
Run Code Online (Sandbox Code Playgroud)
我想yield在新线程中执行。基本上我放在 contextmanager 下的任何内容都应该在单独的线程中执行。
我正在使用 FastAPI,非async端点在带有多个工作线程的 Gunicorn 上运行,来自此处uvicorn.workers.UvicornWorker建议的类。Latley,我注意到在我们的应用程序比平时更繁忙的时候,我们的一些端点存在高延迟。我开始调查它,发现我们应用程序中的并发性并没有像我们预期的那样工作。
假设我有一个具有以下端点的 FastAPI 应用程序 (main.py)
app = FastAPI()
logger = logging.getLogger()
@app.get("/")
def root():
logger.info(f"Running on {os.getpid()}")
time.sleep(3600)
return {"message": "Hello World"}
Run Code Online (Sandbox Code Playgroud)
我gunicorn使用以下命令运行:
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Run Code Online (Sandbox Code Playgroud)
当我向服务器发送五个请求时,除了最后一个请求之外,它们都到达同一个工作线程,而不是在所有工作线程上并行运行:
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 642
Run Code Online (Sandbox Code Playgroud)
如果我将端点转换为async,则每个请求都将在不同的工作线程上处理(最后一个请求将被保留)。我知道,当使用非异步端点时,FastAPI 使用 AnyIO 线程来处理请求,最大线程的默认值为 40。当我尝试将此限制降低到 2 个线程时,例如使用此处的建议,只有前两个请求正在处理,而其他人正在等待(尽管我还有 4 个工人!)
这很糟糕,因为既没有使用我们所有的资源,又因为同一个工作线程上的 GIL 而遭受 python 线程问题。
有没有办法在不转向端点的情况下克服这些问题async?
我编写了一个从文件中获取 URL 并同时向所有 URL 发送 HTTP 请求的脚本。我现在想限制每秒请求HTTP的数量和每个接口(带宽eth0,eth1在会话等)。有没有办法在 Python 上实现这一点?
python throttling bandwidth-throttling python-multithreading
如果之前有人问过这个问题,请原谅我。我环顾四周,但我觉得我没有合适的词汇来通过搜索网络找到它。
我在 python 中有一个多线程应用程序。我希望能够锁定某个代码块,但只能锁定具有特定条件的其他线程。让我举个例子:有三个线程,thread_a,thread_b和thread_c。每个线程都可以foo随时运行该函数。我不希望任何两个bar彼此相等的线程能够同时访问Code block ALPHA。但是,我不想阻止bar值不同的线程。在这种情况下,假设首先thread_a有一个bar == "cat"和命中线(3)。在thread_ahits line之前(5),让我们说thread_b, with bar == "cat"hits line (3)。我愿意thread_b等待。但如果thread_c出现,与bar == "dog",我希望它能够继续前进。
(1) def foo(bar):
(2)
(3) lock(bar)
(4) # Code block ALPHA (two threads with equivalent bar should not be in here)
(5) unlock(bar)
Run Code Online (Sandbox Code Playgroud)
另请注意, 的可能值bar是完全不可预测的,但发生碰撞的可能性非常高。
感谢您的任何帮助。我正在查看的库是python …
我目前正在尝试使用带有多处理后端的 joblib 库在 python 3.5 中运行并行进程。但是,每次运行时都会出现此错误:
Process ForkServerPoolWorker-5:
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/opt/anaconda3/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 108, in worker
task = get()
File "/opt/anaconda3/lib/python3.5/site-packages/joblib/pool.py", line 362, in get
return recv()
File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
TypeError: __new__() missing 1 required positional argument: 'path'
Run Code Online (Sandbox Code Playgroud)
这是我用来运行它的 joblib 代码:
from joblib import Parallel, delayed
results = Parallel(n_jobs=6) (
delayed(func)(i) for i in array)
Run Code Online (Sandbox Code Playgroud)
默认情况下,后端是多处理的。当我将后端更改为“线程”时,代码运行良好,但对于我的用例,与使用多处理相比,线程效率低下。
我还尝试使用以下代码直接使用多处理,但我仍然遇到相同的错误: …
python runtime-error python-multithreading joblib python-multiprocessing
我有一个 Flask 应用程序,在命令行上运行时可以正常工作,但是当它通过 uWSGI 运行时,它无法正确响应请求或工作线程无法正常工作。我重写了一个简单的概念验证/失败程序来演示这个问题:
from datetime import datetime
from threading import Event, Thread
from flask import Flask
class JobManager:
def __init__(self):
self.running = False
self.event = Event()
def start(self):
self.running = True
while self.running:
print("Processing Job at", datetime.now().strftime('%c'))
self.event.clear()
self.event.wait(5)
if self.event.is_set():
print("Interrupted by request!")
def stop(self):
self.running = False
self.event.set()
app = Flask(__name__)
jobs = JobManager()
t = Thread(target=jobs.start)
t.start()
@app.route('/')
def hello_world():
global jobs
jobs.event.set()
return "I'm alive at " + datetime.now().strftime('%c')
if __name__ == '__main__':
app.run() …Run Code Online (Sandbox Code Playgroud)