使用 django 频道和 celery 收听 mqtt 主题

Gab*_*din 3 django websocket celery mqtt django-channels

我想要一种将 django 与 mqtt 集成的方法,为此我首先想到的是使用 django-channels 和一个通过 Web 套接字支持 mqtt 的 mqtt 代理,这样我就可以在代理和 django-channels 之间直接通信。

但是,我没有找到从 django 启动 websocket 客户端的方法,并且根据此链接这是不可能的。

由于我也开始研究任务队列,我想知道使用 paho-mqtt 启动 mqtt 客户端,然后使用 celery 在单独的进程中运行它是否是一个好习惯。然后,该进程将通过 websocket 将代理收到的消息转发到 django 通道,这样我还可以与客户端进程通信,在需要时发布数据或停止 mqtt 客户端,所有这些都直接从 django 进行。

我对这个想法有点怀疑,因为我还读到在 celery 中运行的进程不应该花费太长时间才能完成,在这种情况下,这正是我想要做的。

所以我的问题是,这是一个多么糟糕的主意?还有其他选项可以直接将 django 与 mqtt 集成吗?

*注意:我不想在服务器上运行单独的进程,我希望能够从 django 启动和停止该进程,以便从 Web gui 完全控制 mqtt 客户端

Gab*_*din 6

我找到了一个更好的方法,不需要使用芹菜。

我只是在ready方法的app/apps.py上启动了一个mqtt客户端,所以每次运行应用程序时都会启动一个客户端。从这里我可以使用 django 通道或信号与系统的其他部分进行通信。

应用程序.py:

from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt


class MqttClient(Thread):
    def __init__(self, broker, port, timeout, topics):
    super(MqttClient, self).__init__()
    self.client = mqtt.Client()
    self.broker = broker
    self.port = port
    self.timeout = timeout
    self.topics = topics
    self.total_messages = 0

#  run method override from Thread class
def run(self):
    self.connect_to_broker()

def connect_to_broker(self):
    self.client.on_connect = self.on_connect
    self.client.on_message = self.on_message
    self.client.connect(self.broker, self.port, self.timeout)
    self.client.loop_forever()

# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
    self.total_messages = self.total_messages + 1
    print(str(msg.payload) + "Total: {}".format(self.total_messages))

# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
    #  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
    for topic in self.topics:
        client.subscribe(topic)


class CoreConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'core'

def ready(self):
    MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()
Run Code Online (Sandbox Code Playgroud)