Windows上的Python多处理和网络

Tra*_*uer 7 python sockets windows networking multithreading

我正在尝试实现一个tcp'echo server'.简单的东西:

  1. 客户端向服务器发送消息.
  2. 服务器接收消息
  3. 服务器将消息转换为大写
  4. Server将修改后的消息发送给客户端
  5. 客户端打印响应.

它工作得很好,所以我决定并行化服务器; 使它能够及时处理多个客户端.由于大多数Python解释器都有GIL,因此多线程不会削减它.我不得不使用多处理器......而男孩,这就是事情发生的原因.

我正在使用Windows 10 x64和WinPython适用于Python 3.5.2 x64.

我的想法是创建一个套接字,初始化它(绑定和监听),创建子进程并将套接字传递给子进程.但是对于我的爱...我无法做到这一点,我的子过程几乎立即死亡.最初我有一些问题'腌制'套接字......所以我google了一下,并认为这是问题.所以我尝试通过管道将多个处理队列传递给我的套接字,我的最后一次尝试是'forkpickling'并在处理创建过程中将其作为字节对象传递.什么都行不通.

有人可以在这里说清楚吗?告诉我出了什么事? 也许整个想法(共享套接字)是坏的...如果是这样,请告诉我如何实现我的初始目标:使我的服务器能够立即处理多个客户端(在Windows上)(不要告诉我有关线程的信息) ,我们都知道python的线程不会削减它¬¬)

还值得注意的是,调试功能不会创建任何文件.我相信没有任何过程可以长时间运行它.

我的服务器代码的典型输出是(只有运行之间的差异是进程号):

Server is running...
Degree of parallelism: 4
Socket created.
Socket bount to: ('', 0)
Process 3604 is alive: True
Process 5188 is alive: True
Process 6800 is alive: True
Process 2844 is alive: True

Press ctrl+c to kill all processes.

Process 3604 is alive: False
Process 3604 exit code: 1
Process 5188 is alive: False
Process 5188 exit code: 1
Process 6800 is alive: False
Process 6800 exit code: 1
Process 2844 is alive: False
Process 2844 exit code: 1
The children died...
Why god?
WHYYyyyyy!!?!?!?
Run Code Online (Sandbox Code Playgroud)

服务器代码:

# Imports
import socket 
import packet
import sys
import os
from time import sleep
import multiprocessing as mp
import pickle
import io

# Constants
DEGREE_OF_PARALLELISM = 4
DEFAULT_HOST = ""
DEFAULT_PORT = 0

def _parse_cmd_line_args():
    arguments = sys.argv
    if len(arguments) == 1:
        return DEFAULT_HOST, DEFAULT_PORT
    else:
        raise NotImplemented()

def debug(data):
    pid = os.getpid()
    with open('C:\\Users\\Trauer\\Desktop\\debug\\'+str(pid)+'.txt', mode='a',
              encoding='utf8') as file:
        file.write(str(data) + '\n')

def handle_connection(client):
    client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES)
    debug('received data from client: ' + str(len(client_data)))
    response = client_data.upper()
    client.send(response)    
    debug('sent data from client: ' + str(response))

def listen(picklez):    
    debug('started listen function')

    pid = os.getpid()
    server_socket = pickle.loads(picklez)
    debug('acquired socket')

    while True:
        debug('Sub process {0} is waiting for connection...'.format(str(pid)))

        client, address = server_socket.accept()
        debug('Sub process {0} accepted connection {1}'.format(str(pid),
              str(client)))

        handle_connection(client)        
        client.close()
        debug('Sub process {0} finished handling connection {1}'.
              format(str(pid),str(client)))

if __name__ == "__main__":    
#   Since most python interpreters have a GIL, multithreading won't cut
#   it... Oughta bust out some process, yo!
    host_port = _parse_cmd_line_args()
    print('Server is running...')
    print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM))

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print('Socket created.')

    server_socket.bind(host_port)
    server_socket.listen(DEGREE_OF_PARALLELISM)
    print('Socket bount to: ' + str(host_port))        

    buffer = io.BytesIO()
    mp.reduction.ForkingPickler(buffer).dump(server_socket)
    picklez = buffer.getvalue()

    children = []
    for i in range(DEGREE_OF_PARALLELISM):        
        child_process = mp.Process(target=listen, args=(picklez,))
        child_process.daemon = True
        child_process.start()
        children.append(child_process)

        while not child_process.pid:
            sleep(.25)

        print('Process {0} is alive: {1}'.format(str(child_process.pid), 
              str(child_process.is_alive())))     
    print()    

    kids_are_alive = True
    while kids_are_alive:
        print('Press ctrl+c to kill all processes.\n')
        sleep(1) 

        exit_codes = []
        for child_process in children:
            print('Process {0} is alive: {1}'.format(str(child_process.pid), 
              str(child_process.is_alive())))
            print('Process {0} exit code: {1}'.format(str(child_process.pid), 
              str(child_process.exitcode)))
            exit_codes.append(child_process.exitcode)

        if all(exit_codes):
            # Why do they die so young? :(
            print('The children died...')
            print('Why god?')
            print('WHYYyyyyy!!?!?!?')
            kids_are_alive = False
Run Code Online (Sandbox Code Playgroud)

编辑:修复了"听"的签名.我的进程仍然立即死亡.

edit2:用户cmidi指出这段代码适用于Linux; 所以我的问题是:如何在Windows上"完成这项工作"?

Ery*_*Sun 3

您可以直接将套接字传递给子进程。多处理为此注册了一个减少,为此 Windows 实现使用以下DupSocketmultiprocessing.resource_sharer

class DupSocket(object):
    '''Picklable wrapper for a socket.'''
    def __init__(self, sock):
        new_sock = sock.dup()
        def send(conn, pid):
            share = new_sock.share(pid)
            conn.send_bytes(share)
        self._id = _resource_sharer.register(send, new_sock.close)

    def detach(self):
        '''Get the socket.  This should only be called once.'''
        with _resource_sharer.get_connection(self._id) as conn:
            share = conn.recv_bytes()
            return socket.fromshare(share)
Run Code Online (Sandbox Code Playgroud)

这将调用 Windows 套接字share方法,该方法从调用中返回协议信息缓冲区WSADuplicateSocket。它向资源共享器注册,以通过连接将此缓冲区发送到子进程。子进程依次调用detach,接收协议信息缓冲区并通过 重建套接字socket.fromshare

它与您的问题没有直接关系,但我建议您重新设计服务器以在主进程中调用accept,这是通常完成的方式(例如在Python的socketserver.ForkingTCPServer模块中)。通过 a 将生成的(conn, address)元组传递给第一个可用的工作人员multiprocessing.Queue,该工作人员由进程池中的所有工作人员共享。或者考虑使用multiprocessing.Poolwith apply_async