Art*_*kov 5 python-3.x python-asyncio telegram python-telegram-bot fastapi
我希望我的 FastAPI 应用程序能够访问始终实际bot_data的python-telegram-bot。我需要这个,所以当我调用 FastAPI 中的某个端点时,可以将消息发送到所有聊天,存储在bot_data.
据我了解这个问题:bot.run_polling()并uvicorn.run(...)启动两个独立的异步循环。我需要将它们整合在一起运行。
UPD-1:
感谢@MatsLindh,我创建了下一个函数,我将其传递给主块,但它的工作不一致。有时bot.run_polling()(获得正确的循环并且一切正常,但其他时候并因存在不同循环的错误而中断):
import asyncio
from uvicorn import Config, Server
# --snip--
def run(app: FastAPI, bot:Application):
# using get_event_loop leads to:
# RuntimeError: Cannot close a running event loop
# I guess it is because bot.run_polling()
# calls loop.run_until_complete() different tasks
# loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
server = Server(Config(app=app, port=9001))
loop.create_task(server.serve())
t = Thread(target=loop.run_forever)
t.start()
bot.run_polling()
t.join()
# --snip--
if __name__ == "__main__":
# --snip--
run(f_app, bot_app)
Run Code Online (Sandbox Code Playgroud)
我也知道我可以分解bot.run_polling()为几个单独的调用,这些调用在内部聚合,但我确信它应该只与该快捷函数一起使用。
最初的
我的简化设置如下所示。
最初,我尝试不使用线程运行,而是使用 运行multiprocessing.Proccess,但是这样我bot_data总是空的 - 我认为这是因为机器人数据不在进程之间共享,所以整个事情必须在一个进程中。在这里,我无法在一个异步循环中运行所有这些东西。
# main.py
# python3.10
# pip install fastapi[all] python-telegram-bot
from threading import Thread
import uvicorn
from telegram.ext import Application, ApplicationBuilder, PicklePersistence
from fastapi import FastAPI, Request
BOT_TOKEN = "telegram-bot-token"
MY_CHAT = 123456
class MyApp(FastAPI):
def add_bot(self, bot_app: Application):
self.bot_app = bot_app
async def post_init(app: Application):
app.bot_data["key"] = 42
f_app = MyApp()
@f_app.get("/")
async def test(request: Request):
app: MyApp = request.app
bot_app: Application = app.bot_app
val = bot_app.bot_data.get('key')
print(f"{val=}")
await bot_app.bot.send_message(MY_CHAT, f"Should be 42: {val}")
if __name__ == "__main__":
pers = PicklePersistence("storage")
bot_app = ApplicationBuilder().token(BOT_TOKEN).post_init(post_init).persistence(pers).build()
f_app.add_bot(bot_app)
t1 = Thread(target=uvicorn.run, args=(f_app,), kwargs={"port": 9001})
t1.start()
# --- Launching polling in main thread causes
# telegram.error.NetworkError: Unknown error in HTTP implementation:
# RuntimeError('<asyncio.locks.Event object at 0x7f2764e6fd00 [unset]> is bound to a different event loop')
# message is sent and value is correct, BUT app breaks and return 500
# bot_app.run_polling()
# --- Launching polling in separate thread causes
# RuntimeError: There is no current event loop in thread 'Thread-2 (run_polling)'.
# t2 = Thread(target=bot_app.run_polling)
# t2.start()
# --- Launching with asyncio causes:
# ValueError: a coroutine was expected, got <bound method Application.run_polling ...
# import asyncio
# t2 = Thread(target=asyncio.run, args=(bot_app.run_polling,))
# t2.start()
t1.join()
Run Code Online (Sandbox Code Playgroud)
Chr*_*ris 10
调用时uvicorn.run(),会创建一个新的事件循环asyncio.run()(在内部调用\xe2\x80\x94请参阅链接的源代码)。当启动服务器(因此,FastAPI 应用程序)\xe2\x80\x94 后尝试使用另一个应用程序时uvicorn,反之亦然\xe2\x80\x94 也会创建一个新的事件循环,例如您的 Telegram 机器人应用程序,该行在退出已经运行的事件循环之前,将不会到达启动其他应用程序的代码。这是因为运行事件循环是阻塞的,这意味着它将阻塞调用线程,直到事件循环终止。
如果您还尝试在已使用事件循环的应用程序内运行其他应用程序(本质上是事件循环),或者尝试调用asyncio.run()或应用程序内有多个调用loop.run_until_complete(),您将遇到如下错误:
> RuntimeError: Cannot run the event loop while another loop is running\n> RuntimeError: asyncio.run() cannot be called from a running event loop\n> RuntimeError: This event loop is already running\nRun Code Online (Sandbox Code Playgroud)\n有几种方法可以解决这个问题。出于演示目的,下面给出的解决方案使用简单的打印应用程序作为第二个应用程序,该应用程序也创建事件循环。这个应用程序如下:
\n打印应用程序.py
\n> RuntimeError: Cannot run the event loop while another loop is running\n> RuntimeError: asyncio.run() cannot be called from a running event loop\n> RuntimeError: This event loop is already running\nRun Code Online (Sandbox Code Playgroud)\n您可以使用从已运行的环境中运行uvicorn.Server.serve()(另请参阅类的实现以了解所有可用参数,即,, 等)。首先,使用创建一个新的事件循环,然后使用 将其设置为当前线程的当前事件循环。接下来,通过使用并向其传递协程(即协程对象是调用函数的结果)而不是执行该函数的方法来调度另一个异步应用程序的执行。上面就是这个函数。包装在任务中的协程可能不会立即运行。一旦事件循环找到执行任务\xe2\x80\x94的机会,它就会被调度并运行,如本答案中所述,当当前运行的协程到达表达式以及or块时,可能会发生这种情况,因为这些操作是在幕后使用的。uvicornasyncConfighostportasyncio.new_event_loop()asyncio.set_event_loop()loop.create_task()async defasyncio.run()printing_app.pygo()awaitasync forasync withawait
最后,使用loop.run_until_complete()来运行 uvicorn 服务器,通过传递uvicorn.Server.serve()coroutine\xe2\x80\x94 如果传递给的参数loop.run_until_complete()是协程,则将其包装在 a 中Task(参见相关实现,以及上面的文档链接);loop.create_task()因此,这次不需要调用协程。它将执行提供的任务并阻塞直到完成。
为了清楚起见,asyncio.new_event_loop()、asyncio.set_event_loop()和loop.run_until_complete()是使用 \xe2\x80\x94 时幕后实际发生的情况,asyncio.run()请参阅最新的 Python 类Runner实现,以及Python 3.10 中该方法的实现run()(可能更详细)清除)。
PS One 也可以使用create_task()并最终调用创建每个任务loop.run_forever(),这将永远运行事件循环,直到通过调用其stop()方法显式停止为止。另一方面,loop.run_until_complete()它将继续运行,直到您传递给它的任务完成并返回结果(或引发异常时)。根据一个人的需要以及他们必须执行的任务的性质,可以在两者之间进行选择。
import asyncio\n\nasync def go():\n counter = 0\n while True:\n counter += 1\n print(counter)\n await asyncio.sleep(1)\n\n \ndef run():\n asyncio.run(go())\nRun Code Online (Sandbox Code Playgroud)\n由于这是一个 FastAPI 应用程序,您可以照常运行服务器(使用uvicorn.run(app)),并利用 FastAPI\'s/Starlette\'s Lifespan 事件在 处执行第二个应用程序startup。要执行它,您可以使用asyncio.create_task(),它将协程包装到任务中,如前所述,并安排其执行。该任务将在 返回的循环中执行asyncio.get_running_loop(),该循环返回当前线程中的事件循环。或者,您可以调用asyncio.get_running_loop()自己来获取正在运行的事件循环,然后使用该create_task()函数(如前所述)来执行任务。
from fastapi import FastAPI\nimport printing_app\nimport asyncio\nimport uvicorn\n\napp = FastAPI()\n\n\n@app.get(\'/\')\ndef main():\n return \'Hello World!\'\n \n\ndef start_uvicorn(loop):\n config = uvicorn.Config(app, loop=loop)\n server = uvicorn.Server(config)\n loop.run_until_complete(server.serve())\n \n\ndef start_printing_app(loop):\n loop.create_task(printing_app.go()) # pass go() (coroutine), not run() \n\n \nif __name__ == \'__main__\':\n loop = asyncio.new_event_loop()\n asyncio.set_event_loop(loop)\n start_printing_app(loop)\n start_uvicorn(loop)\nRun Code Online (Sandbox Code Playgroud)\n另一种变体是使用asyncio.run()创建一个async运行应用程序的环境,然后调用asyncio.create_task()启动另一个应用程序,最后使用await server.serve()启动 uvicorn 服务器\xe2\x80\x94,一旦 uvicorn 执行,最后一部分之后的任何进一步代码都将被执行服务器已完成运行或强制退出(例如,按 时CTRL + C)。
from fastapi import FastAPI\nfrom contextlib import asynccontextmanager\nimport asyncio\nimport printing_app\nimport uvicorn\n\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n asyncio.create_task(printing_app.go())\n # Alternatively:\n #loop = asyncio.get_running_loop()\n #loop.create_task(printing_app.go())\n yield\n\n\napp = FastAPI(lifespan=lifespan)\n\n\n@app.get(\'/\')\ndef main():\n return \'Hello World!\'\n \n\nif __name__ == \'__main__\':\n uvicorn.run(app)\nRun Code Online (Sandbox Code Playgroud)\n另一种解决方案是使用(如此处nest_asyncio所示),它允许在嵌套环境中运行多个事件循环。但是,通常建议避免使用嵌套事件循环,因为它可能会导致意外行为。asyncio
正如相关库的维护者在 github 上的评论Application.run_polling()中提到的,使用纯粹是可选的,并且会阻塞事件循环,直到用户发送停止信号;这就是run_polling()不适合与 ASGI 框架(例如 FastAPI)结合使用的原因。run_polling()在这种情况下,您只需手动调用在幕后实际运行的方法即可。可以在此处查看展示如何在 Starlette 应用程序上运行 uvicorn 服务器以及 telegram-bot 应用程序的示例。基于该示例和之前提供的所有信息,提供了以下解决方案。
from fastapi import FastAPI\nimport asyncio\nimport printing_app\nimport uvicorn\n\napp = FastAPI()\n\n\n@app.get(\'/\')\ndef main():\n return \'Hello World!\'\n\n \nasync def main():\n # start printing app\n asyncio.create_task(printing_app.go())\n \n # start uvicorn server\n config = uvicorn.Config(app)\n server = uvicorn.Server(config)\n await server.serve()\n \n \nif __name__ == \'__main__\':\n asyncio.run(main())\nRun Code Online (Sandbox Code Playgroud)\nfrom fastapi import FastAPI\nimport asyncio\nimport uvicorn\n\napp = FastAPI()\n\n\n@app.get(\'/\')\ndef main():\n return \'Hello World!\'\n \n\nasync def main():\n config = uvicorn.Config(app, host=\'0.0.0.0\', port=8000)\n server = uvicorn.Server(config)\n \n application = .... # initialise your telegram-bot app\n \n # Run application and webserver together\n async with application:\n await application.start()\n await server.serve()\n await application.stop()\n\n\nif __name__ == \'__main__\':\n asyncio.run(main())\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
4644 次 |
| 最近记录: |