我想永远每60秒在Python中重复执行一个函数(就像目标C中的NSTimer一样).这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要用户设置.
在这个关于用Python实现的cron的问题中,解决方案似乎实际上只是sleep() x秒.我不需要这样的高级功能,所以也许这样的东西可行
while True:
# Code executed here
time.sleep(60)
Run Code Online (Sandbox Code Playgroud)
这段代码有可预见的问题吗?
我有以下代码:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
Run Code Online (Sandbox Code Playgroud)
如果我在本地主机上运行我的代码 - 例如http://localhost:8501/ping- 在同一浏览器窗口的不同选项卡中,我得到:
Hello
bye
Hello
bye
Run Code Online (Sandbox Code Playgroud)
代替:
Hello
Hello
bye
bye
Run Code Online (Sandbox Code Playgroud)
我已经阅读过有关使用的内容httpx,但仍然无法实现真正的并行化。有什么问题?
python asynchronous concurrent-processing python-asyncio fastapi
我有很少的阻止功能foo,bar我无法改变那些(一些我无法控制的内部库.与一个或多个网络服务交谈).我如何将其用作异步?我不想做以下事情.
results = []
for inp in inps:
val = foo(inp)
result = bar(val)
results.append(result)
Run Code Online (Sandbox Code Playgroud)
这将是低效的,因为我foo在等待第一个输入时可以调用第二个输入,并且相同bar.如何包装它们,这样它们与ASYNCIO(即新的使用async,await语法)?
让我们假设这些函数是可重入的.即,foo当先前foo正在处理时再次调用是可以的.
更新
用可重复使用的装饰器扩展答案.点击这里举例.
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
return inner
Run Code Online (Sandbox Code Playgroud) 我正在尝试将后台任务添加到我的中间件中,但尚未在他们的文档中找到任何官方方法来执行此操作,这是我迄今为止尝试过的方法:
async def task1():
logging.info("Waiting ...")
time.sleep(5)
logging.info("Waited.")
@app.middleware("http")
async def add_process_time_header(request, call_next, bt=Depends(BackgroundTasks)):
a = bt.dependency()
a.add_task(task1)
a()
return await call_next(request)
Run Code Online (Sandbox Code Playgroud)
这阻止了我的请求,我是否应该能够在没有等待的情况下调用异步函数?我想完全忽略后台任务的结果,我不需要 bg 任务的结果
asyncio.sleep()\ 的阻塞表兄弟time.sleep()不能保证它会休眠所请求的时间。
\n\n实际的挂起时间可能小于请求的时间,因为任何捕获的信号都将在执行该信号\xe2\x80\x99s 捕获例程后终止 sleep() 。
\n
asyncio.sleep()\ 的文档没有提到类似的限制。
是否asyncio.sleep()能够对其休眠时间做出更有力的保证?