相关疑难解决方法(0)

在Python中每x秒重复执行一次函数的最佳方法是什么?

我想永远每60秒在Python中重复执行一个函数(就像目标C中的NSTimer一样).这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要用户设置.

这个关于用Python实现的cron的问题中,解决方案似乎实际上只是sleep() x秒.我不需要这样的高级功能,所以也许这样的东西可行

while True:
    # Code executed here
    time.sleep(60)
Run Code Online (Sandbox Code Playgroud)

这段代码有可预见的问题吗?

python timer

245
推荐指数
12
解决办法
36万
查看次数

FastAPI 以串行方式而不是并行方式运行 api 调用

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}
Run Code Online (Sandbox Code Playgroud)

如果我在本地主机上运行我的代码 - 例如http://localhost:8501/ping- 在同一浏览器窗口的不同选项卡中,我得到:

Hello
bye
Hello
bye
Run Code Online (Sandbox Code Playgroud)

代替:

Hello
Hello
bye
bye
Run Code Online (Sandbox Code Playgroud)

我已经阅读过有关使用的内容httpx,但仍然无法实现真正​​的并行化。有什么问题?

python asynchronous concurrent-processing python-asyncio fastapi

50
推荐指数
1
解决办法
4万
查看次数

如何在现有的阻塞库中使用asyncio?

我有很少的阻止功能foo,bar我无法改变那些(一些我无法控制的内部库.与一个或多个网络服务交谈).我如何将其用作异步?我不想做以下事情.

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)
Run Code Online (Sandbox Code Playgroud)

这将是低效的,因为我foo在等待第一个输入时可以调用第二个输入,并且相同bar.如何包装它们,这样它们与ASYNCIO(即新的使用async,await语法)?

让我们假设这些函数是可重入的.即,foo当先前foo正在处理时再次调用是可以的.


更新

用可重复使用的装饰器扩展答案.点击这里举例.

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner
Run Code Online (Sandbox Code Playgroud)

python python-3.x async-await python-asyncio python-3.5

19
推荐指数
4
解决办法
6038
查看次数

FastAPI 中间件中的后台任务

我正在尝试将后台任务添加到我的中间件中,但尚未在他们的文档中找到任何官方方法来执行此操作,这是我迄今为止尝试过的方法:

async def task1():
    logging.info("Waiting ...")
    time.sleep(5)
    logging.info("Waited.")

@app.middleware("http")
async def add_process_time_header(request, call_next, bt=Depends(BackgroundTasks)):
    a = bt.dependency()
    a.add_task(task1)
    a()

    return await call_next(request)
Run Code Online (Sandbox Code Playgroud)

这阻止了我的请求,我是否应该能够在没有等待的情况下调用异步函数?我想完全忽略后台任务的结果,我不需要 bg 任务的结果

python asynchronous fastapi

4
推荐指数
1
解决办法
4006
查看次数

`asyncio.sleep(delay)` 是否保证睡眠至少 `delay` 秒?

asyncio.sleep()\ 的阻塞表兄弟time.sleep()不能保证它会休眠所请求的时间。

\n
\n

实际的挂起时间可能小于请求的时间,因为任何捕获的信号都将在执行该信号\xe2\x80\x99s 捕获例程后终止 sleep() 。

\n
\n

asyncio.sleep()\ 的文档没有提到类似的限制。

\n

是否asyncio.sleep()能够对其休眠时间做出更有力的保证?

\n

python python-asyncio

3
推荐指数
1
解决办法
3482
查看次数