使用队列的Asyncio persisent客户端协议类

thi*_*ezn 8 python tcp python-asyncio

我试图了解Python 3 asyncio模块,特别是使用传输/协议API.我想创建一个发布/订阅模式,并使用asyncio.Protocol该类来创建我的客户端和服务器.

目前,我已启动并运行服务器,并侦听传入的客户端连接.客户端能够连接到服务器,发送消息并接收回复.

我希望能够保持TCP连接存活并维护一个允许我添加消息的队列.我试图找到一种方法来使用低级API(传输/协议),但有限的asyncio docs/examples online似乎都进入了高级API - 使用流等等.有人能够指出我如何实现这个目标的正确方向?

这是服务器代码:

#!/usr/bin/env python3

import asyncio
import json


class SubscriberServerProtocol(asyncio.Protocol):
    """ A Server Protocol listening for subscriber messages """

    def connection_made(self, transport):
        """ Called when connection is initiated """

        self.peername = transport.get_extra_info('peername')
        print('connection from {}'.format(self.peername))
        self.transport = transport

    def data_received(self, data):
        """ The protocol expects a json message containing
        the following fields:

            type:       subscribe/unsubscribe
            channel:    the name of the channel

        Upon receiving a valid message the protocol registers
        the client with the pubsub hub. When succesfully registered
        we return the following json message:

            type:           subscribe/unsubscribe/unknown
            channel:        The channel the subscriber registered to
            channel_count:  the amount of channels registered
        """

        # Receive a message and decode the json output
        recv_message = json.loads(data.decode())

        # Check the message type and subscribe/unsubscribe
        # to the channel. If the action was succesful inform
        # the client.
        if recv_message['type'] == 'subscribe':
            print('Client {} subscribed to {}'.format(self.peername,
                                                      recv_message['channel']))
            send_message = json.dumps({'type': 'subscribe',
                                       'channel': recv_message['channel'],
                                       'channel_count': 10},
                                      separators=(',', ':'))
        elif recv_message['type'] == 'unsubscribe':
            print('Client {} unsubscribed from {}'
                  .format(self.peername, recv_message['channel']))
            send_message = json.dumps({'type': 'unsubscribe',
                                       'channel': recv_message['channel'],
                                       'channel_count': 9},
                                      separators=(',', ':'))
        else:
            print('Invalid message type {}'.format(recv_message['type']))
            send_message = json.dumps({'type': 'unknown_type'},
                                      separators=(',', ':'))

        print('Sending {!r}'.format(send_message))
        self.transport.write(send_message.encode())

    def eof_received(self):
        """ an EOF has been received from the client.

        This indicates the client has gracefully exited
        the connection. Inform the pubsub hub that the
        subscriber is gone
        """
        print('Client {} closed connection'.format(self.peername))
        self.transport.close()

    def connection_lost(self, exc):
        """ A transport error or EOF is seen which
        means the client is disconnected.

        Inform the pubsub hub that the subscriber has
        Disappeared
        """
        if exc:
            print('{} {}'.format(exc, self.peername))


loop = asyncio.get_event_loop()

# Each client will create a new protocol instance
coro = loop.create_server(SubscriberServerProtocol, '127.0.0.1', 10666)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
try:
    server.close()
    loop.until_complete(server.wait_closed())
    loop.close()
except:
    pass
Run Code Online (Sandbox Code Playgroud)

这是客户端代码:

#!/usr/bin/env python3

import asyncio
import json


class SubscriberClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        """ Upon connection send the message to the
        server

        A message has to have the following items:
            type:       subscribe/unsubscribe
            channel:    the name of the channel
        """
        transport.write(self.message.encode())
        print('Message sent: {!r}'.format(self.message))

    def data_received(self, data):
        """ After sending a message we expect a reply
        back from the server

        The return message consist of three fields:
            type:           subscribe/unsubscribe
            channel:        the name of the channel
            channel_count:  the amount of channels subscribed to
        """
        print('Message received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

if __name__ == '__main__':
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
                         separators=(',', ':'))

    loop = asyncio.get_event_loop()
    coro = loop.create_connection(lambda: SubscriberClientProtocol(message,
                                                                   loop),
                                  '127.0.0.1', 10666)
    loop.run_until_complete(coro)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('Closing connection')
    loop.close()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 12

你的服务器就像你正在尝试做的那样好; 你编写的代码实际上保持了TCP连接的活动,你只是没有管道来连续提供新的消息.要做到这一点,您需要调整客户端代码,以便您可以随时将新消息提供给它,而不是仅在connection_made回调触发时执行此操作.

这很容易; 我们将添加一个内部asyncio.QueueClientProtocol它可以接收邮件,然后运行在一个无限循环消耗从该消息的协同程序Queue,并将它们发送到服务器.最后一部分是实际存储ClientProtocolcreate_connection调用中返回的实例,然后将其传递给实际发送消息的协同程序.

import asyncio
import json

class SubscriberClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.transport = None
        self.loop = loop
        self.queue = asyncio.Queue()
        self._ready = asyncio.Event()
        asyncio.async(self._send_messages())  # Or asyncio.ensure_future if using 3.4.3+

    @asyncio.coroutine
    def _send_messages(self):
        """ Send messages to the server as they become available. """
        yield from self._ready.wait()
        print("Ready!")
        while True:
            data = yield from self.queue.get()
            self.transport.write(data.encode('utf-8'))
            print('Message sent: {!r}'.format(message))

    def connection_made(self, transport):
        """ Upon connection send the message to the
        server

        A message has to have the following items:
            type:       subscribe/unsubscribe
            channel:    the name of the channel
        """
        self.transport = transport
        print("Connection made.")
        self._ready.set()

    @asyncio.coroutine
    def send_message(self, data):
        """ Feed a message to the sender coroutine. """
        yield from self.queue.put(data)

    def data_received(self, data):
        """ After sending a message we expect a reply
        back from the server

        The return message consist of three fields:
            type:           subscribe/unsubscribe
            channel:        the name of the channel
            channel_count:  the amount of channels subscribed to
        """
        print('Message received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

@asyncio.coroutine
def feed_messages(protocol):
    """ An example function that sends the same message repeatedly. """
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
                         separators=(',', ':'))
    while True:
        yield from protocol.send_message(message)
        yield from asyncio.sleep(1)

if __name__ == '__main__':
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
                         separators=(',', ':'))

    loop = asyncio.get_event_loop()
    coro = loop.create_connection(lambda: SubscriberClientProtocol(loop),
                                  '127.0.0.1', 10666)
    _, proto = loop.run_until_complete(coro)
    asyncio.async(feed_messages(proto))  # Or asyncio.ensure_future if using 3.4.3+
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('Closing connection')
    loop.close()
Run Code Online (Sandbox Code Playgroud)