使用 MySQL <> WebSocket 实时更新 Django 应用程序

dri*_*987 7 django django-channels

我需要不断从MySQL数据库中获取数据,该数据库以大约 200 毫秒的更新频率获取数据。我需要不断更新仪表板文本字段上的数据值。我的仪表板是基于 Django 构建的。

我已经阅读了很多关于Channels但所有的教程都是关于聊天应用程序的。我知道我需要实现WebSockets它基本上有一个开放的连接并获取数据。使用聊天应用程序,这是有道理的,但我还没有遇到任何关于MySQL数据库的内容。

我也读过关于mysql-events. 由于表中的数据来自外部传感器,我不明白如何监控 Django 中的表,即每当表中添加新行时,我需要根据列插入新行价值。

关于如何去做的任何想法?我浏览了很多文章,但找不到针对此要求的特定内容。

Tim*_*ayy 7

如果您需要不断查询 sql 数据库,您最好的选择是使用 Celery 或dramatiq,它们更简单/更容易,但与 Django Channels 结合使用时测试较少。

Celery 允许您创建可以向其发送任务(函数)的工作线程(有点像后台进程)。当工作人员收到任务时,它将执行。所有这一切都是在后台完成的。从工作线程正在执行的任务中,您实际上可以通过 Websocket 直接从工作线程发回数据。仅当您启用了 django 通道 + 通道层时,这才有效,因为当您启用通道层时,打开通道/websocket 时创建的每个消费者实例都会有一个名称,您可以将其传递给工作线程,以便它知道要发送哪个 websocket查询数据返回。

这个过程的流程如下:

  1. 客户端请求连接到您的 websocket
  2. 创建消费者实例并为其指定一个特定名称
  3. 消费者实例接受连接
  4. Consumer触发celery任务并传递名称
  5. Worker 开始每隔 X 秒轮询您的 SQL 数据库
  6. 当工作人员发现新条目时,使用给定的名称并通过 websocket 将新条目发回。

我建议阅读有关消费者和通道层的 django 通道文档以及 celery 或 Dramatiq 教程,以了解它们是如何工作的。为了使这一切发挥作用,您还必须了解 Redis 和消息队列服务(例如 RabbitMQ)。简单回答的内容太多,但如果您有具体问题,我可以提供更多信息。

编辑:

  • 在您的计算机上安装 Redis 服务器。如果您像我一样使用 Windows,那么您必须从 Windows 应用商店下载 WSL 2 并安装 Ubuntu(免费)。此链接可以引导您完成它。

  • 获取 RabbitMQ 服务器设置。按照他们的教程进行操作

  • 启用Django Channels和 Django-Channel-layers,然后将 Redis 设置为默认的 Django-channels 后端

  • 设置 Dramatiq 或 Celery。我更喜欢 Dramatiq,因为它基本上是 Celery 的新改进版本,尽管不太受欢迎。设置和使用更加容易。是 Django-dramatiq 的 github 存储库,它将引导您完成如何设置它。请注意,就像当您启动 django 服务器时,python manage.py runserver您必须python manage.py rundramatiq在测试网站之前启动 Dramatiq 工作人员。

  • 在 django 应用程序中创建一个tasks.py 文件,并在该任务中实现代码以检查 MySQL 数据库中的新条目。如果您还没有弄清楚,这里是开始使用的链接。在您的任务文件中,您应该有一个顶部带有装饰器的函数dramatiq.actor,以便dramatiq 知道该函数是一个任务。

  • 构建 django-channels 使用者来处理 WebSocket 连接并允许您通过 WebSocket 连接发送数据。这就是标准消费者的样子:

class AsyncDashboardConsumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()

    async def disconnect(self, code):
        await self.close()

    async def receive_json(self, text_data=None, bytes_data=None, **kwargs):
        someData = text_data['someData']
        someOtherData = text_data['someOtherData']

        if 'execute_getMySQLdata' in text_data['function']:
            await self.getData(someData, someOtherData)

    async def sendDataToClient(self, event):
        await self.send(text_data=event['text'])

    async def getData(self, someData, someOtherData):
        sync_to_async(SQLData.send(self.channel_name, someData, someOtherData))
Run Code Online (Sandbox Code Playgroud)
  • connect当客户端尝试连接到路由文件(在步骤 2 中)指向该使用者的 WebSocket URL 时,将调用该函数。

  • recieve_json每当客户端将数据发送到 django 服务器时都会调用该函数。

  • getData函数从该函数调用recieve_json,并发送一条消息来启动您之前创建的dramatiq 任务来检查SQL 数据库。请注意,当您发送消息时,您必须传入 self.channel_name,因为您使用该channel_name 通过 WebSocket 直接从 Dramatiq Worker/Task 发送回数据。

  • sendDataToClient当您将数据发送回客户端时使用该函数。因此,当您从任务发送数据时,您必须将这个函数作为可调用函数传入。

要从您之前创建的任务发送数据,请使用以下命令:async_to_sync(channel_layer.send)(channelName, {'type': 'sendData', 'text': jsonPayload})。请注意如何传递channelName 以及sendData来自消费者的函数。

最后,客户端的 javascript 如下所示:

    let socket = new WebSocket("wss://javascript.info/article/websocket/demo/hello");
    
    socket.onopen = function(e) {
      alert("[open] Connection established");
      alert("Sending to server");
      socket.send("My name is John");
    };
    
    socket.onmessage = function(event) {
      alert(`[message] Data received from server: ${event.data}`);
    };
    
    socket.onclose = function(event) {
      if (event.wasClean) {
        alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
      } else {
        // e.g. server process killed or network down
        // event.code is usually 1006 in this case
        alert('[close] Connection died');
      }
    };
    
    socket.onerror = function(error) {
      alert(`[error] ${error.message}`);
    };
Run Code Online (Sandbox Code Playgroud)

代码直接来自JavaScript WebSocket 演练。

这就是具有后台工作人员的基本 Web 应用程序不断实时更新信息的方式。可能还有其他方法可以在没有后台工作人员的情况下执行此操作,但由于您希望在信息到达时尽快获取信息,因此最好有一个不断检查更新的后台进程。另一方面,上面的代码意味着为每个连接的新客户端打开与数据库的单独连接,但您可以轻松地利用 django-channels 组并与数据库建立一个连接,然后将其发送到特定组中的所有客户端。


dri*_*987 6

感谢Timothee Legros 的回答,它帮助我朝着正确的方向前进。

在互联网上的任何地方,它都说 Django 频道是/可以用于实时应用程序,但没有任何地方谈论确切的实现(除了聊天应用程序)。

我使用CeleryDjango ChannelsCelery's Beat来完成任务,它按预期工作。

它分为三个部分。设置通道,然后创建一个 celery 任务,定期调用它(在 Celery Beat 的帮助下),然后将该任务的输出发送到通道,以便它可以将该数据发送到 websocket。

频道

我遵循了 Channel 网站上的原始教程并以此为基础。

路由.py

from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
    re_path(r'ws/realtimeupdate/$', consumers.RealTimeConsumer),
]
Run Code Online (Sandbox Code Playgroud)

消费者.py

class RealTimeConsumer(AsyncWebsocketConsumer):
    async def connect(self):

        self.channel_group_name = 'core-realtime-data'

        # Join room group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        print(text_data)
        pass

    async def loc_message(self, event):
        # print(event)
        message_trans = event['message_trans']
        message_tag = event['message_tag']
        # print("sending data to websocket")
        await self.send(text_data=json.dumps({
            'message_trans': message_trans,
            'message_tag': message_tag
        }))
Run Code Online (Sandbox Code Playgroud)

这个类基本上会在websocket收到数据后将数据发送给它。以上两个将特定于应用程序。

现在我们将设置Celery

在项目的基本目录中,设置文件所在的位置,我们需要制作三个文件。

  • celery.py这将初始化芹菜。
  • routing.py这将用于路由通道的 websocket 地址。
  • task.py这是我们将设置任务的地方

芹菜.py

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings')

app = Celery('proj_name', backend='redis://localhost', broker='redis://localhost/')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}') 
Run Code Online (Sandbox Code Playgroud)

路由.py

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from app_name import routing

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
            URLRouter(
                routing.websocket_urlpatterns
            )
        ),
})
Run Code Online (Sandbox Code Playgroud)

任务.py

@shared_task(name='realtime_task')
def RealTimeTask():
    time_s = time.time()
    result_trans = CustomModel_1.objects.all()
    result_tag = CustomModel_2.objects.all()
    result_trans_json = serializers.serialize('json', result_trans)
    result_tag_json = serializers.serialize('json', result_tag)
    # output = {"ktr": result_transmitter_json, "ktag": result_tag_json}
    # print(output)
    channel_layer = get_channel_layer()
    message = {'type': 'loc_message',
               'message_transmitter': result_trans_json,
               'message_tag': result_tag_json}
    async_to_sync(channel_layer.group_send)('core-realtime-data', message)
    print(time.time()-time_s)
Run Code Online (Sandbox Code Playgroud)

任务在完成任务后,将结果发送回通道,通道又将其中继到 websocket。

设置.py

# Channels
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

CELERY_BEAT_SCHEDULE = {
    'task-real': {
        'task': 'realtime_task',
        'schedule': 1 # this means, the task will run itself every second
    },
}
Run Code Online (Sandbox Code Playgroud)

现在唯一剩下的就是在javascript文件中创建一个 websocket并开始监听它。

//Create web socket to receive data
const chatSocket = new WebSocket(
    'ws://'
    + window.location.host
    + '/ws/realtimeupdate'
    + '/'
);

chatSocket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    console.log(e.data + '\n');
    // For trans
    var arrayOfObjects = JSON.parse(data.message_trans);
    //Do your thing

    //For tags
    var arrayOfObjects_tag = JSON.parse(data.message_tag);
    //Do your thing
    }

};

chatSocket.onclose = function(e) {
    console.error('Chat socket closed unexpectedly');
};
Run Code Online (Sandbox Code Playgroud)

为了回答MySQL 的使用,我将数据MySQL从外部传感器插入到数据库中,并在tasks.py中使用Django ORM查询表。

总的来说,它完成了预期的工作,用来自 MySQL 的实时数据填充实时仪表板。可以肯定,可能有不同的更好的方法,请让我知道。