Bac*_*hrc 5 python django multithreading python-3.x django-channels
我已经使用 Django Channels 工作了一个星期了,并行性让我有些烦恼runworker。
例如,我有一个 MQTT 客户端,它在收到消息时在通道中发布,基本。
async def treat_message(msg):
channel_layer = get_channel_layer()
payload = json.loads(msg.payload, encoding="utf-8")
await channel_layer.send("mqtt", {
"type": "value.change",
"message": payload
})
Run Code Online (Sandbox Code Playgroud)
这个送的很好啊 我想发送多少就发送多少,它会发送到redis队列中。到频道mqtt。
然后我运行工作程序,它将重定向队列中的消息mqtt:
python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']
Run Code Online (Sandbox Code Playgroud)
这就是问题开始的地方。以下是 AsyncConsumer 读取数据的内容:
class MQTTConsumer(AsyncConsumer):
async def value_change(self, event):
await asyncio.sleep(5)
print("I received changes : {}".format(event["message"]))
Run Code Online (Sandbox Code Playgroud)
我睡觉是为了模拟任务的业务。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程时的 5 秒。如下所示。
2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}
Run Code Online (Sandbox Code Playgroud)
有关该主题的任何信息都会有很大帮助,提前致谢!
编辑:我发现管理它的唯一方法是创建一个执行器,其中包含执行异步操作的工作人员。但我不确定其部署效率
def handle_mqtt(event):
time.sleep(3)
logger.info("I received changes : {}".format(event["message"]))
class MQTTConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
async def value_change(self, event):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, handle_mqtt, event)
Run Code Online (Sandbox Code Playgroud)
目前这是设计使然
是的,这就是预期的设计,因为它是最安全的方式(如果您不知道的话,它可以防止竞争条件)。如果您愿意并行运行消息,只需在需要时分离您自己的协程(使用
asyncio.create_task),确保清理它们并等待它们关闭。这是相当大的开销,因此希望我们将来能够为消费者提供选择加入模式,但目前我们提供的只是安全选项。
https://github.com/django/channels/issues/1203
| 归档时间: |
|
| 查看次数: |
2204 次 |
| 最近记录: |