Python 中多个 websocket 客户端连接的最佳方法?

use*_*954 5 python websocket

我很高兴我要问的问题相当广泛,但是,作为 Python 的新手,我正在努力寻找[最佳]方法来完成一些事情,这在 Node.js 中是微不足道的,而在 Node.js 中则非常微不足道。其他环境,例如 C#。

假设有一个装满东西的仓库。假设该仓库有一个具有两个特征的 Websocket 接口:在客户端连接上,它会输出仓库当前库存的完整列表,然后在库存发生变化时进一步进行流式更新。

网络上有很多关于如何在 Python 中连接到仓库并响应其状态变化的示例。但...

如果我想连接到两个仓库并根据从每个仓库分别检索的组合信息执行某些操作,该怎么办?如果我想根据时间等因素来做事情,而不是仅仅由库存变化和传入的 websocket 消息驱动,该怎么办?

在我见过的所有例子中 - 开始感觉有数百个 - 在某个地方,以某种形式,有 arun()或 arun_forever()或 arun_until_complete()等。换句话说,I/O 可能是异步的,但总有一个代码中存在大量阻塞操作,并且始终有两个不适合我的情况的基本假设:只有一个 websocket 连接,并且所有处理都将由[单个] websocket 服务器发送的事件驱动。

我非常不清楚我的问题的答案是否是使用多个事件循环、多线程或其他东西。

迄今为止,尝试 Python 感觉就像在顶层公寓的地板上,欣赏着古怪但无可否认的优雅装饰。但当你走进电梯,按下标有“并行”或“并发”的按钮,电梯就会自由落体,最终把你送到一个充满了一些相当丑陋且冒着热气的管道的地下室。

...从华丽的隐喻回到技术上,我正在努力解决的关键问题是相当于 Node.js 代码的 Python,它可能像下面的示例一样简单[为了简单起见,显得不优雅]:

var aggregateState = { ... some sort of representation of combined state ... };

var socket1 = new WebSocket("wss://warehouse1");
socket1.on("message", OnUpdateFromWarehouse);

var socket2 = new WebSocket("wss://warehouse2");
socket2.on("message", OnUpdateFromWarehouse);

function OnUpdateFromWarehouse(message)
{
  ... Take the information and use it to update aggregate state from both warehouses ...
}


Run Code Online (Sandbox Code Playgroud)

use*_*954 5

回答我自己的问题,希望它可以帮助其他Python新手......asyncio似乎是要走的路(尽管存在一些陷阱,例如您可以轻松地使事件循环陷入僵局)。

假设使用异步友好的 websocket 模块(例如websockets ),似乎有效的是一个遵循以下原则的框架 - 为了简单起见,删除了诸如重新连接之类的逻辑。(前提仍然是一个仓库,它发送其完整库存的初始列表,然后发送对该初始状态的更新。)

class Warehouse:
    def __init__(self, warehouse_url):
        self.warehouse_url = warehouse_url
        self.inventory = {}  # Some description of the warehouse's inventory
    
    async def destroy():
        if (self.websocket.open):
            self.websocket.close()  # Terminates any recv() in wait_for_incoming() 
            await self.incoming_message_task  # keep asyncio happy by awaiting the "background" task

    async def start(self):
        try:
            # Connect to the warehouse
            self.websocket = await connect(self.warehouse_url)          
            # Get its initial message which describes its full state
            initial_inventory = await self.websocket.recv()
            # Store the initial inventory
            process_initial_inventory(initial_inventory)
            # Set up a "background" task for further streaming reads of the web socket
            self.incoming_message_task = asyncio.create_task(self.wait_for_incoming())
            # Done
            return True
        except:
            # Connection failed (or some unexpected error)
            return False

    async def wait_for_incoming(self):
        while self.websocket.open:
            try:
                update_message = await self.websocket.recv()
                asyncio.create_task(self.process_update_message(update_message))
            except:
                # Presumably, socket closure
                pass

    def process_initial_inventory(self, initial_inventory_message):
        ... Process initial_inventory_message into self.inventory ...
    
    async def process_update_message(self, update_message):
        ... Merge update_message into self.inventory ...
        ... And fire some sort of event so that the object's 
        ... creator can detect the change. There seems to be no ...
        ... consensus about what is a pythonic way of implementing events, ... 
        ... so I'll declare that - potentially trivial - element as out-of-scope ...

Run Code Online (Sandbox Code Playgroud)

完成初始连接逻辑后,关键的一件事是设置一个“后台”任务,该任务重复读取通过 Websocket 传入的进一步更新消息。上面的代码不包括任何事件的触发,但是有各种各样的方法process_update_message()可以做到这一点(其中许多都非常简单),允许对象的创建者在任何时候以它认为合适的方式处理通知。只要对象的创建者继续与 asyncio 良好地配合并参与协作多任务处理,就将继续接收流消息,并且将继续触发任何事件。

完成后,可以按照以下方式建立连接:

async def main():
    warehouse1 = Warehouse("wss://warehouse1")
    if await warehouse1.start():
        ... Connection succeeded. Update messages will now be processed 
        in the "background" provided that other users of the event loop 
        yield in some way ...
    else:
        ... Connection failed ...

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

可以通过多种方式启动多个仓库,包括对create_task(warehouse.start())每个仓库执行一次操作,然后对gather任务执行一次操作,以确保/检查它们是否都正常。

当需要退出时,为了让 asyncio 保持快乐,并停止它抱怨孤立任务,并让一切顺利关闭,有必要调用destroy()每个仓库。

但有一个共同点并未涵盖。扩展上面的原始前提,假设仓库还接受来自我们的 websocket 客户端的请求,例如“将 X 运送到 Y”。对这些请求的成功/失败响应将与一般更新消息一起出现;通常不可能保证请求的 send() 之后的第一个 receive() 将是对该请求的响应。这让事情变得复杂了process_update_message()

我找到的最佳答案可能被认为是“pythonic”,也可能不被认为是“pythonic”,因为它使用的Future方式与TaskCompletionSource.NET 中的 a 非常相似。

让我们发明一些实现细节;任何现实世界的场景都可能是这样的:

  • 我们可以在向仓库提交指令时提供request_id
  • 来自仓库的成功/失败响应将 request_id 重复返回给我们(因此也区分了命令响应消息与库存更新消息)

第一步是创建一个字典,将待处理的、正在进行的请求的 ID 映射到Future对象:

    def __init__(self, warehouse_url):
        ...
        self.pending_requests = {}
Run Code Online (Sandbox Code Playgroud)

发送请求的协程的定义如下所示:

    async def send_request(self, some_request_definition)
        # Allocate a unique ID for the  request
        request_id = <some unique request id>
        # Create a Future for the pending request
        request_future = asyncio.Future()
        # Store the map of the ID -> Future in the dictionary of pending requests
        self.pending_requests[request_id] = request_future
        # Build a request message to send to the server, somehow including the request_id
        request_msg = <some request definition, including the request_id>
        # Send the message 
        await self.websocket.send(request_msg) 
        # Wait for the future to complete - we're now asynchronously awaiting
        # activity in a separate function
        await asyncio.wait_for(command_future, timeout = None)
        # Return the result of the Future as the return value of send_request()
        return request_future.result()
Run Code Online (Sandbox Code Playgroud)

调用者可以使用如下内容创建请求并等待其异步响应:

     some_result = await warehouse.send_request(<some request def>)
Run Code Online (Sandbox Code Playgroud)

使这一切顺利进行的关键是修改和扩展 process_update_message()以执行以下操作:

  • 区分请求响应与库存更新
  • 对于前者,提取请求 ID(我们发明的场景称该 ID 会重复返回给我们)
  • 查找待处理Future的请求
  • 对它执行 a set_result()(其值可以是任何值,具体取决于服务器的响应内容)。这会释放send_request()并导致它的等待得到解决。

例如:

    async def process_update_message(self, update_message):
        if <some test that update_message is a request response>:
            request_id = <extract the request ID repeated back in update_message>
            # Get the Future for this request ID
            request_future = self.pending_requests[request_id]
            # Create some sort of return value for send_request() based on the response
            return_value = <some result of the request>
            # Complete the Future, causing send_request() to return
            request_future.set_result(return_value)
        else:
            ... handle inventory updates as before ...
Run Code Online (Sandbox Code Playgroud)


ti7*_*ti7 1

我没有使用带有 asyncio 的套接字,但您可能只是在寻找 asyncioopen_connection

async def socket_activity(address, callback):
    reader, _ = await asyncio.open_connection(address)
    while True:
        message = await reader.read()
        if not message:  # empty bytes on EOF
            break  # connection was closed
        await callback(message)
Run Code Online (Sandbox Code Playgroud)

然后将它们添加到事件循环中

    tasks = []  # keeping a reference prevents these from being garbage collected
    for address in ["wss://warehouse1", "wss://warehouse2"]:
        tasks.append(asyncio.create_task(
            socket_activity(address, callback)
        ))
    # return tasks  # or work with them
Run Code Online (Sandbox Code Playgroud)

如果你想在协程中等待直到 N 个操作完成,你可以使用.gather()

或者,您可能会发现Tornado可以做您想要的一切,甚至更多(我的答案基于此)
Tornado websocket 客户端:如何异步 on_message?(从未等待过协程)