python在同一连接上套接字多个消息

Dim*_*urg 4 python sockets

我的问题更笼统而不是具体。我想实现一个简单的客户端服务器应用程序,只是将消息从客户端传递到服务器,并从服务器获取确认。

我想知道在使用套接字进行操作时必须考虑什么,我是否必须实现自己的通信接口并管理同一连接上的消息传递或为每个消息创建一个新连接?

(请假设当前的消息小于BUFFER_SIZE)

代码是这样的:

server.py

server_info = (HOST, PORT)
sock = socket.socket(family=AF_INET, type=SOCK_STREAM)
sock.bind(server_info)
sock.listen(NUMBER_OF_SOCKETS)
try:
    while True:
        connection, client_address = sock.accept()
        try:
            while True:
                data = connection.recv(BUFFER_SIZE)
                print('message received: {data}'.format(data=data))
                connection.send("ok")
        finally:
            connection.close()
Run Code Online (Sandbox Code Playgroud)

client.py

server_info = (HOST, PORT)
sock = socket.socket(family=AF_INET, type=SOCK_STREAM)
sock.connect(server_info)
try:
    print("connection established")
    while True:
        print("Please enter a message you want to pass to the server")
        msg = raw_input()

        print('sending "{message}"'.format(message=msg))
        sock.send(msg)

        while True:
            data = sock.recv(constants.BUFFER_SIZE)
            print('received "{data}"'.format(data=data))
            break

finally:
    print('closing socket')
    sock.close()
Run Code Online (Sandbox Code Playgroud)

此代码使我能够在服务器端接收多个消息,并从客户端发送多个消息。这是正确的方法吗?为此,我必须在客户端进行2次无限循环,关闭连接又如何呢?当我发送0字节消息时,服务器和客户端都会卡住。

非常感谢你!

Oha*_*Lad 6

添加两种类型的服务器-客户端,一种是通过多进程进行的,另一种是异步的,它们的作用几乎相同,异步的一种更为健壮,请在此处阅读原因: 线程与异步

我的示例:使用多进程:

import multiprocessing
import socket
import time

HOST = "0.0.0.0"
PORT = 9000


def handle(connection, address):

    try:
        while True:
            data = connection.recv(1024)
            connection.sendall(data + ' server time {}'.format(time.time()))
    except:
        pass
    finally:
        connection.close()


class Server(object):

    def __init__(self, hostname, port):
        self.hostname = hostname
        self.port = port

    def start(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((self.hostname, self.port))
        self.socket.listen(1)

        while True:
            conn, address = self.socket.accept()
            process = multiprocessing.Process(
                target=handle, args=(conn, address))
            process.daemon = True
            process.start()


if __name__ == "__main__":
    server = Server(HOST, PORT)
    try:
        print 'start'
        server.start()
    except:
        print 'something wrong happened, a keyboard break ?'
    finally:
        for process in multiprocessing.active_children():
            process.terminate()
            process.join()
    print 'Goodbye'
Run Code Online (Sandbox Code Playgroud)

和它的客户:

    import sys
import threading
import time
import socket

SOCKET_AMOUNT = 100
HOST = "localhost"
PORT = 9000


def myclient(ip, port, message):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    sock.sendall(message)
    result = sock.recv(1024)
    print result + ' final clnt time {}'.format(time.time())
    sock.close()

if __name__ == "__main__":
    thread_list = []
    for i in range(SOCKET_AMOUNT):
        msg = "Thread #{}, clnt time {}".format(i, time.time())
        client_thread = threading.Thread(
            target=myclient, args=(HOST, PORT, msg))
        thread_list.append(client_thread)
        client_thread.start()

    waiting = time.time()
    [x.join() for x in thread_list]
    done = time.time()
    print 'DONE {}. Waiting for {} seconds'.format(done, done-waiting)
Run Code Online (Sandbox Code Playgroud)

下一台服务器要强大得多!数据不会丢失!服务器:

import asyncore
import socket
import time
import logging
import json


class Server(asyncore.dispatcher):

    def __init__(self, host, port):

        self.logger = logging.getLogger('SERVER')
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(('', port))
        self.listen(confjson.get('SERVER_QUEUE_SIZE', None))
        self.logger.debug('binding to {}'.format(self.socket.getsockname()))

    def handle_accept(self):
        socket, address = self.accept()
        self.logger.debug('new connection accepted')
        EchoHandler(socket)


class EchoHandler(asyncore.dispatcher_with_send):

    def handle_read(self):

        msg = self.recv(confjson.get('RATE', None))
        self.out_buffer = msg
        self.out_buffer += ' server recieve: {}'.format(time.time())
        if not self.out_buffer:
            self.close()


if __name__ == "__main__":

    logging.basicConfig(level=logging.DEBUG,
                        format='%(name)s: %(message)s',
                        )
    with open('config.json', 'r') as jfile:
        confjson = json.load(jfile)
    try:
        logging.debug('Server start')
        server = Server(confjson.get('HOST', None),
                        confjson.get('PORT', None))
        asyncore.loop()
    except:
        logging.error('Something happened,\n'
                      'if it was not a keyboard break...\n'
                      'check if address taken, '
                      'or another instance is running. Exit')
    finally:
        logging.debug('Goodbye')
Run Code Online (Sandbox Code Playgroud)

和异步客户端:

import asyncore
import socket
import time
import logging
import json


class Client(asyncore.dispatcher_with_send):

    def __init__(self, host, port, message, pk):
        self.logger = logging.getLogger('CLIENT')
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.host = host
        self.port = port
        self.connect((host, port))
        self.out_buffer = message
        self.clientID = pk
        self.logger.debug('Connected #{}'.format(self.clientID))

    def handle_close(self):
        self.close()

    def handle_read(self):
        rec_msg = self.recv(confjson.get('RATE', None))
        self.logger.debug('#{}, {} back at client {}'.format(self.clientID,
                                                             rec_msg,
                                                             time.time()
                                                             )
                          )
        self.close()


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG,
                        format='%(name)s: %(message)s',
                        )

    with open('config.json', 'r') as jfile:
        confjson = json.load(jfile)
    clients = []
    for idx in range(confjson.get('SOCKET_AMOUNT', None)):
        msg = "Start: {}".format(time.time())
        clients.append(Client(confjson.get('HOST', None),
                              confjson.get('PORT', None),
                              msg,
                              idx)
                       )
    start = time.time()
    logging.debug(
        'Starting async loop for all connections, unix time {}'.format(start))
    asyncore.loop()
    logging.debug('{}'.format(time.time() - start))
Run Code Online (Sandbox Code Playgroud)

和一个小的配置文件:

{
    "HOST": "127.0.0.1",
    "PORT": 5007,
    "RATE": 8096,
    "SERVER_QUEUE_SIZE": 16,
    "SOCKET_AMOUNT": 100
}
Run Code Online (Sandbox Code Playgroud)

  • 所以我不得不稍微修改一下代码,因为 python 3.6.5 不允许传输字符串。现在必须对它们进行编码/解码才能发送信号。但是当将 RPi 连接到 Windows 10 python 时它工作得很好 (2认同)