rad*_*duw 2 django python-asyncio django-channels
我已将 django.channels 添加到 django 项目中,以支持长时间运行的进程,这些进程通过 websockets 通知用户进度。
除了长时间运行的进程的实现似乎没有异步响应之外,一切似乎都运行良好。
为了进行测试,我创建了一个AsyncConsumer识别“run”和“isBusy”两种类型的消息。
“运行”消息处理程序设置“忙标志”,发回“进程正在运行”消息,异步等待20 秒重置“忙标志”,然后发回“进程完成消息”
“isBusy”消息返回带有忙标志状态的消息。
我的期望是,如果我发送一条运行消息,我将立即收到一条“进程正在运行”消息,20 秒后我将收到一条“进程完成”消息。这按预期工作。
我还希望如果我发送“isBusy”消息,我将立即收到带有标志状态的响应。
观察到的行为如下:
下面是 Channel 监听器的实现:
class BackgroundConsoleConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.busy = False
async def run(self, message):
print("run got message", message)
self.busy = True
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text":"running please wait"
})
await asyncio.sleep(20)
self.busy = False
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text": "finished running"
})
async def isBusy(self,message):
print('isBusy got message', message)
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text": "process isBusy:{0}".format(self.busy)
})
Run Code Online (Sandbox Code Playgroud)
通道在路由文件中设置如下:
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
url("^console/$", ConsoleConsumer),
])
),
"channel": ChannelNameRouter({
"background-console":BackgroundConsoleConsumer,
}),
})
Run Code Online (Sandbox Code Playgroud)
我用一名工人运行频道(通过 ./manage.py runworker )。
实验是在 django 测试服务器上完成的(通过 runserver)。
关于为什么渠道消费者似乎没有异步工作的任何想法将不胜感激。
经过这里的一些挖掘是问题和解决方法。
通道将发送给它的消息添加到 asyncio.Queue 并按顺序处理它们。
释放协程控制(通过 aasyncio.sleep()或类似的东西)是不够的,必须在消费者收到新消息之前完成消息处理程序的处理。
这是对前一个示例的修复,其行为符合预期(即isBusy在处理run长时间运行的任务时响应消息)
谢谢@user4815162342 的建议。
class BackgroundConsoleConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.busy = False
async def run(self, message):
loop = asyncio.get_event_loop()
loop.create_task(self.longRunning())
async def longRunning(self):
self.busy = True
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "running please wait", 'author': 'background console process'})
})
print('before sleeping')
await asyncio.sleep(20)
print('after sleeping')
self.busy = False
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "finished running", 'author': 'background console process'})
})
async def isBusy(self,message):
print('isBusy got message', message)
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "process isBusy:{0}".format(self.busy),
'author': 'background console process'})
})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2023 次 |
| 最近记录: |