AWS IoT Python SDK 和 asyncio

Gia*_*nis 4 python amazon-web-services mqtt python-asyncio aws-iot

我需要使用 AWS IoT MQTT 服务。我目前正在使用https://github.com/aws/aws-iot-device-sdk-python进行一些实验。

我的应用程序将使用 websockets 与另一个服务通信,然后发布/订阅 MQTT 主题来转发/接收消息。

该库是否有可能阻止代码执行?我仍然尝试了解 asyncio,但不确定我应该注意什么。我怎么知道它是否会引起问题?

我相信我只需要使用上面库中的AWSIoTMQTTClient

这是我的工作代码的摘录:

class AWSIoTClient:

    def __init__():
        ...
        self.client = AWSIoTMQTTClient(...)

    def subscribe(self, callback):
        self.client.subscribe(f'{self.TOPIC}/subscribe/', 0, callback)

    def publish(self, message):
        self.client.publish(self.TOPIC, message, 0)


class MyWSProtocol(WebSocketClientProtocol):

    def set_aws_client(self, client: AWSIoTClient):
        client.subscribe(self.customCallback)
        self.client = client

    def customCallback(self, client, userdata, message):
        # This will be called when we send message from AWS
        if message.payload:
            message = json.loads(message.payload.decode('utf-8').replace("'", '"'))
            message['id'] = self.next_id()
            self.sendMessage(json.dumps(message).encode('utf-8'))

    def onMessage(self, payload, isBinary):
        message = json.loads(payload)

        # This will forward message to AWS
        self.client.publish(str(payload))
Run Code Online (Sandbox Code Playgroud)

Mik*_*mov 5

该库是否有可能阻止代码执行?

我怎么知道它是否会引起问题?

您不应该允许在任何协程中存在长时间运行的阻塞(同步)代码。它将导致阻塞全局事件循环并进一步阻塞所有地方的协程。

async def main():
    await asyncio.sleep(3)  # async sleeping, it's ok

    time.sleep(3)           # synchronous sleeping, this freezes event loop 
                            # and all coroutines for 3 seconds, 
                            # you should avoid it!
    
    await asyncio.sleep(3)  # async sleeping, it's ok
Run Code Online (Sandbox Code Playgroud)

如果您需要在协程内运行阻塞代码,您应该在执行器中执行(请阅读此处)。

当你编写协程时你应该记住这一点,但如果你启用调试模式,通常 asyncio 会警告你这个错误:

import asyncio
import time


async def main():
    await asyncio.sleep(3)
    time.sleep(3)
    await asyncio.sleep(3)


loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
try:
    loop.run_until_complete(main())
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
Run Code Online (Sandbox Code Playgroud)

你会看到警告:

Executing <Handle <TaskWakeupMethWrapper object at 0x000002063C2521F8>(<Future finis...events.py:275>) created at C:\Users\gmn\AppData\Local\Programs\Python\Python36\Lib\asyncio\futures.py:348> took 3.000 seconds
Run Code Online (Sandbox Code Playgroud)