使用 Django + Telegram + Celery 执行周期性和非周期性任务

dpv*_*dpv 0 python django celery telegram python-telegram-bot

我正在构建一个基于 Django 的项目,我的目的之一是拥有一个从 Telegram 组接收信息的 Telegram 机器人。我能够实现机器人在 Telegram 中发送消息,没有任何问题。

目前,我有几个与 Beat 一起运行的 Celery 任务以及 Django Web(已解耦)。这里一切都好。

我已经看到 python-telegram-bot 正在一个示例中运行一个函数(https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/echobot.py)它正在等待空闲以接收来自 Telegram 的数据。现在,我在 Celery 中的所有任务此时都是周期性的,并且由 Beat 每 10 或 60 分钟调用一次。如何在我的配置中使用 Celery 运行这个非周期性任务?我说非周期性是因为我知道它将等待内容直到被手动中断。

  • 姜戈~=3.2.6

  • 芹菜~=5.1.2

     CELERY_BEAT_SCHEDULE = {
     'task_1': {
         'task': 'apps.envc.tasks.Fetch1',
         'schedule': 600.0,
     },
     'task_2': {
         'task': 'apps.envc.tasks.Fetch2',
         'schedule': crontab(minute='*/60'),
     },
     'task_3': {
         'task': 'apps.envc.tasks.Analyze',
         'schedule': 600,
     },
    
    Run Code Online (Sandbox Code Playgroud)

    }

在我的tasks.py 中,我有这样的任务之一:

@celery_app.task(name='apps.envc.tasks.TelegramBot')
def TelegramBot():
    status = start_bot()
    return status
Run Code Online (Sandbox Code Playgroud)

作为 start_bot 实现,我只是复制了 echobot.py 示例,并在其中添加了我的 TOKEN(当然示例中不同命令的函数也在那里)。

DJ *_*nes 5

设置 webhook 而不是使用 Celery 进行轮询

\n

使用 Django,您不应该使用 Celery 来运行 Telegram 轮询(您称之为 PTB 的 \xe2\x80\x9c 非周期性任务 \xe2\x80\x9d,最好将其描述为长时间运行的进程或服务)。Celery 是为明确的任务而设计的,而不是无限期运行的进程。

\n

由于 Django 暗示您已经在运行 Web 服务器,因此 Webhook 选项更合适。(请记住,您可以进行轮询或设置 Webhook,以便从 Telegram 服务器接收更新。)@CallMeStag建议的选项,即使用非线程 Webhook 设置)对于 Django-PTB 最有意义一体化。

\n

Dispatcher您可以在单独的模块中进行机器人设置(在实例上定义和注册处理程序函数);为了避免线程化,你应该传递update_queue=None, workers=0给你的Dispatcher实例化。然后,在 Django 视图中使用它,如下所示:

\n
import json\nfrom django.views.decorators.csrf import csrf_exempt\nfrom telegram import Update\n\nfrom .telegram_init import telegram_bot, telegram_dispatcher\n\n...\n\n@csrf_exempt\ndef telegram_webhook(request):\n    data = json.loads(request.body)\n    update = Update.de_json(data, telegram_bot)\n    telegram_dispatcher.process_update(update)\n\n    return JsonResponse({})\n
Run Code Online (Sandbox Code Playgroud)\n

我用于实例化的实例telegram_bot在哪里Bottelegram_dispatcher。(我在这段代码中省略了错误处理。)

\n

为什么要避免线程化?更一般意义上的线程在 Django 中并不被禁止,但在 PTB 的上下文中,线程通常意味着在共享更新/消息队列的长时间运行的线程中运行机器人更新程序或调度程序,这是一个不复杂的情况。例如,典型的 Django 部署在不同的进程中使用多个 Gunicorn 工作线程,看起来既不漂亮,也不好用。然而,在 Django-PTB 集成中使用多线程(实际上是使用 Celery 的多个进程)是有动机的见下文。

\n

开发环境注意事项

\n

上述设置是您想要用于基本生产系统的设置。但在开发过程中,除非您的开发机器具有固定 IP 面向互联网,否则您可能无法使用 Webhook,因此您仍然需要进行轮询。一种方法是创建自定义 Django 管理命令

\n

<my_app>/management/commands/polltelegram.py:

\n
from django.core.management.base import BaseCommand\n\nfrom my_django_project.telegram_init import telegram_updater\n\n\nclass Command(BaseCommand):\n    help = \'Run Telegram bot polling.\'\n\n    def handle(self, *args, **options):\n        updater.start_polling()\n        self.stdout.write(\n            \'Telegram bot polling started. \'\n            \'Press CTRL-BREAK to terminate.\'\n        )\n        updater.idle()\n        self.stdout.write(\'Polling stopped.\')\n
Run Code Online (Sandbox Code Playgroud)\n

然后,在开发期间,运行python manage.py polltelegram以获取并处理 Telegram 更新。(与此一起 运行python manage.py runserver以便能够同时使用主 Django 应用程序;轮询在使用此设置的单独进程中运行,而不仅仅是单独的线程。)

\n

当芹菜有意义时

\n

如果您将 PTB 与 Django 集成,那么 Celery 确实可以发挥作用,而这正是可靠性成为问题的时候。例如,当您希望在出现暂时性网络问题时能够重试发送回复。另一个潜在问题是,上面详述的非线程 Webhook 设置在高流量场景中可能会遇到洪水/速率限制。PTB 当前的解决方案MessageQueue使用线程,虽然它可以工作,但它可能会引入其他问题,例如在runserver开发期间运行时干扰 Django 的自动重新加载功能。

\n

更优雅可靠的解决方案是使用Celery来运行PTB的消息发送功能。这允许重试和速率限制以获得更好的可靠性。

\n

简而言之,此集成仍然可以使用上面的非线程 webhook 设置,但您必须将函数隔离Bot.send_message()到 Celery 任务中,然后确保所有处理程序异步调用此 Celery 任务,而不是使用机器人send_message()在 webhook 中运行“热切”地处理。

\n