dri*_*987 7 django django-channels
我需要不断从MySQL数据库中获取数据,该数据库以大约 200 毫秒的更新频率获取数据。我需要不断更新仪表板文本字段上的数据值。我的仪表板是基于 Django 构建的。
我已经阅读了很多关于Channels但所有的教程都是关于聊天应用程序的。我知道我需要实现WebSockets它基本上有一个开放的连接并获取数据。使用聊天应用程序,这是有道理的,但我还没有遇到任何关于MySQL数据库的内容。
我也读过关于mysql-events. 由于表中的数据来自外部传感器,我不明白如何监控 Django 中的表,即每当表中添加新行时,我需要根据列插入新行价值。
关于如何去做的任何想法?我浏览了很多文章,但找不到针对此要求的特定内容。
如果您需要不断查询 sql 数据库,您最好的选择是使用 Celery 或dramatiq,它们更简单/更容易,但与 Django Channels 结合使用时测试较少。
Celery 允许您创建可以向其发送任务(函数)的工作线程(有点像后台进程)。当工作人员收到任务时,它将执行。所有这一切都是在后台完成的。从工作线程正在执行的任务中,您实际上可以通过 Websocket 直接从工作线程发回数据。仅当您启用了 django 通道 + 通道层时,这才有效,因为当您启用通道层时,打开通道/websocket 时创建的每个消费者实例都会有一个名称,您可以将其传递给工作线程,以便它知道要发送哪个 websocket查询数据返回。
这个过程的流程如下:
我建议阅读有关消费者和通道层的 django 通道文档以及 celery 或 Dramatiq 教程,以了解它们是如何工作的。为了使这一切发挥作用,您还必须了解 Redis 和消息队列服务(例如 RabbitMQ)。简单回答的内容太多,但如果您有具体问题,我可以提供更多信息。
编辑:
在您的计算机上安装 Redis 服务器。如果您像我一样使用 Windows,那么您必须从 Windows 应用商店下载 WSL 2 并安装 Ubuntu(免费)。此链接可以引导您完成它。
获取 RabbitMQ 服务器设置。按照他们的教程进行操作
启用Django Channels和 Django-Channel-layers,然后将 Redis 设置为默认的 Django-channels 后端。
设置 Dramatiq 或 Celery。我更喜欢 Dramatiq,因为它基本上是 Celery 的新改进版本,尽管不太受欢迎。设置和使用更加容易。这是 Django-dramatiq 的 github 存储库,它将引导您完成如何设置它。请注意,就像当您启动 django 服务器时,python manage.py runserver您必须python manage.py rundramatiq在测试网站之前启动 Dramatiq 工作人员。
在 django 应用程序中创建一个tasks.py 文件,并在该任务中实现代码以检查 MySQL 数据库中的新条目。如果您还没有弄清楚,这里是开始使用的链接。在您的任务文件中,您应该有一个顶部带有装饰器的函数dramatiq.actor,以便dramatiq 知道该函数是一个任务。
构建 django-channels 使用者来处理 WebSocket 连接并允许您通过 WebSocket 连接发送数据。这就是标准消费者的样子:
class AsyncDashboardConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, code):
await self.close()
async def receive_json(self, text_data=None, bytes_data=None, **kwargs):
someData = text_data['someData']
someOtherData = text_data['someOtherData']
if 'execute_getMySQLdata' in text_data['function']:
await self.getData(someData, someOtherData)
async def sendDataToClient(self, event):
await self.send(text_data=event['text'])
async def getData(self, someData, someOtherData):
sync_to_async(SQLData.send(self.channel_name, someData, someOtherData))
Run Code Online (Sandbox Code Playgroud)
connect当客户端尝试连接到路由文件(在步骤 2 中)指向该使用者的 WebSocket URL 时,将调用该函数。
recieve_json每当客户端将数据发送到 django 服务器时都会调用该函数。
getData函数从该函数调用recieve_json,并发送一条消息来启动您之前创建的dramatiq 任务来检查SQL 数据库。请注意,当您发送消息时,您必须传入 self.channel_name,因为您使用该channel_name 通过 WebSocket 直接从 Dramatiq Worker/Task 发送回数据。
sendDataToClient当您将数据发送回客户端时使用该函数。因此,当您从任务发送数据时,您必须将这个函数作为可调用函数传入。
要从您之前创建的任务发送数据,请使用以下命令:async_to_sync(channel_layer.send)(channelName, {'type': 'sendData', 'text': jsonPayload})。请注意如何传递channelName 以及sendData来自消费者的函数。
最后,客户端的 javascript 如下所示:
let socket = new WebSocket("wss://javascript.info/article/websocket/demo/hello");
socket.onopen = function(e) {
alert("[open] Connection established");
alert("Sending to server");
socket.send("My name is John");
};
socket.onmessage = function(event) {
alert(`[message] Data received from server: ${event.data}`);
};
socket.onclose = function(event) {
if (event.wasClean) {
alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
} else {
// e.g. server process killed or network down
// event.code is usually 1006 in this case
alert('[close] Connection died');
}
};
socket.onerror = function(error) {
alert(`[error] ${error.message}`);
};
Run Code Online (Sandbox Code Playgroud)
此代码直接来自JavaScript WebSocket 演练。
这就是具有后台工作人员的基本 Web 应用程序不断实时更新信息的方式。可能还有其他方法可以在没有后台工作人员的情况下执行此操作,但由于您希望在信息到达时尽快获取信息,因此最好有一个不断检查更新的后台进程。另一方面,上面的代码意味着为每个连接的新客户端打开与数据库的单独连接,但您可以轻松地利用 django-channels 组并与数据库建立一个连接,然后将其发送到特定组中的所有客户端。
感谢Timothee Legros 的回答,它帮助我朝着正确的方向前进。
在互联网上的任何地方,它都说 Django 频道是/可以用于实时应用程序,但没有任何地方谈论确切的实现(除了聊天应用程序)。
我使用Celery、Django Channels和Celery's Beat来完成任务,它按预期工作。
它分为三个部分。设置通道,然后创建一个 celery 任务,定期调用它(在 Celery Beat 的帮助下),然后将该任务的输出发送到通道,以便它可以将该数据发送到 websocket。
频道
我遵循了 Channel 网站上的原始教程并以此为基础。
路由.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
re_path(r'ws/realtimeupdate/$', consumers.RealTimeConsumer),
]
Run Code Online (Sandbox Code Playgroud)
消费者.py
class RealTimeConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.channel_group_name = 'core-realtime-data'
# Join room group
await self.channel_layer.group_add(
self.channel_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.channel_group_name,
self.channel_name
)
# Receive message from WebSocket
async def receive(self, text_data):
print(text_data)
pass
async def loc_message(self, event):
# print(event)
message_trans = event['message_trans']
message_tag = event['message_tag']
# print("sending data to websocket")
await self.send(text_data=json.dumps({
'message_trans': message_trans,
'message_tag': message_tag
}))
Run Code Online (Sandbox Code Playgroud)
这个类基本上会在websocket收到数据后将数据发送给它。以上两个将特定于应用程序。
现在我们将设置Celery。
在项目的基本目录中,设置文件所在的位置,我们需要制作三个文件。
芹菜.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings')
app = Celery('proj_name', backend='redis://localhost', broker='redis://localhost/')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Run Code Online (Sandbox Code Playgroud)
路由.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from app_name import routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
),
})
Run Code Online (Sandbox Code Playgroud)
任务.py
@shared_task(name='realtime_task')
def RealTimeTask():
time_s = time.time()
result_trans = CustomModel_1.objects.all()
result_tag = CustomModel_2.objects.all()
result_trans_json = serializers.serialize('json', result_trans)
result_tag_json = serializers.serialize('json', result_tag)
# output = {"ktr": result_transmitter_json, "ktag": result_tag_json}
# print(output)
channel_layer = get_channel_layer()
message = {'type': 'loc_message',
'message_transmitter': result_trans_json,
'message_tag': result_tag_json}
async_to_sync(channel_layer.group_send)('core-realtime-data', message)
print(time.time()-time_s)
Run Code Online (Sandbox Code Playgroud)
任务在完成任务后,将结果发送回通道,通道又将其中继到 websocket。
设置.py
# Channels
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
CELERY_BEAT_SCHEDULE = {
'task-real': {
'task': 'realtime_task',
'schedule': 1 # this means, the task will run itself every second
},
}
Run Code Online (Sandbox Code Playgroud)
现在唯一剩下的就是在javascript文件中创建一个 websocket并开始监听它。
//Create web socket to receive data
const chatSocket = new WebSocket(
'ws://'
+ window.location.host
+ '/ws/realtimeupdate'
+ '/'
);
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
console.log(e.data + '\n');
// For trans
var arrayOfObjects = JSON.parse(data.message_trans);
//Do your thing
//For tags
var arrayOfObjects_tag = JSON.parse(data.message_tag);
//Do your thing
}
};
chatSocket.onclose = function(e) {
console.error('Chat socket closed unexpectedly');
};
Run Code Online (Sandbox Code Playgroud)
为了回答MySQL 的使用,我将数据MySQL从外部传感器插入到数据库中,并在tasks.py中使用Django ORM查询表。
总的来说,它完成了预期的工作,用来自 MySQL 的实时数据填充实时仪表板。可以肯定,可能有不同的更好的方法,请让我知道。
| 归档时间: |
|
| 查看次数: |
3051 次 |
| 最近记录: |