如何在 python 中编写多处理 Web 服务器

Zei*_*zar 2 python webserver multiprocessing

我在 python 中有一个简单的 web 服务器,它根据一些配置响应请求。配置定义的百分比OKNOKTimeoutNull响应:

import socket
import sys
import os
import datetime
import random
import time


# define globals
global log_file
global configs

dash = '-'
sep = '\n' + 100 * dash + '\n'
ok_message = 'HTTP/1.0 200 OK\n\n'
nok_message = 'HTTP/1.0 404 NotFound\n\n'


def initialize():
    if not os.path.isdir('./logs'):
        os.mkdir(os.path.abspath('./logs'))
    path = os.path.abspath(os.path.join(os.path.abspath('./logs'),
            datetime.datetime.now().strftime('%d-%m-%Y %H-%M-%S')))
    os.mkdir(path)
    log_file = open(os.path.join(path, 'received_packets.log'), 'a')


def finalize():
    log_file.close()


def select_resp_type():
    percents = {}
    for key, val in configs.items():
        if key.endswith('Percent'):
            percents.update({key: int(val)})
    items = [x.replace('Percent', '') for x, v in percents.items()
             if (float(counts[x.replace('Percent', '')]) / counts['all_packets']) * 100 < v]
    print items
    print [(float(counts[x.replace('Percent', '')]) / counts['all_packets']) * 100 for x, v in percents.items()]
    if len(items):
        selected = random.choice(items)
        counts[selected] += 1
        return selected
    sys.stdout('Everything is done!')
    sys.exit(0)


def get_response():
    resp_type = select_resp_type()
    if resp_type == 'ok':
        return ok_message
    elif resp_type == 'nok':
        return nok_message
    elif resp_type == 'nok':
        time.sleep(int(configs['timeoutAmount']))
        return ok_message
    elif resp_type == 'nok':
        time.sleep(int(configs['timeoutAmount']))
        return None


def load_configs(config):
    if not os.path.isfile(config):
        log_file.write('No such file ' + os.path.abspath(config))
        sys.exit(1)
    config_lines = open(config, 'r').readlines()
    configs = {}
    for line in config_lines:
        if line.strip() == '' or line.strip().startswith('#'):
            continue
        configs.update({line.split('=')[0].strip(): line.split('=')[1].strip()})


if __name__ == '__main__':
    initialize()
    config = sys.argv[3]
    load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(1)
    try:
        while True:
            s_sock, s_addr = s.accept()
            sfile = s_sock.makefile('rw', 0)
            content = sfile.readline().strip()
            while content != '':
                log_file.write(content + sep)
                resp = get_response()
                if resp:
                sfile.write(resp)
                sfile = s_sock.makefile('rw', 0)
                content = sfile.readline().strip()
            sfile.close()
            s_sock.close()
    except:
        print 'an exception occurred!'
        sys.exit(1)
    finally:
        finalize()
Run Code Online (Sandbox Code Playgroud)

这是我的配置文件:

# server configurations
host = 127.0.0.1
port = 8000
okPercent = 80
nokPercent = 20
nullPercent = 0
timeoutPercent = 0
timeoutAmount = 120
maxClients = 10
Run Code Online (Sandbox Code Playgroud)

我想将此脚本更改为多处理(我的意思是非阻塞,以便可以处理多个请求)Web 服务器,但我不知道从哪里开始以及如何进行。有什么帮助吗?

编辑 1:

根据@Jan-Philip Gehrcke 的回答,我将脚本更改为使用gevent库:

def answer(s):
    try:
        gevent.sleep(1)
        s_sock, s_addr = s.accept()
        print conn_sep + 'Receive a connection from ' + str(s_addr)
        while True:
            content = s_sock.recv(1024)
            counts['all_packets'] += 1
            log_file.write(packet_sep + content)
            resp = get_response()
            if resp:
                s_sock.send(resp)
    except:
         print 'An error occurred in connection with ', s_addr, '; quiting...'



if __name__ == '__main__':
    log_dir = sys.argv[2]
    log_file = initialize(sys.argv[2])
    config = sys.argv[1]
    configs = load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(int(configs['maxClients']))
    threads = [gevent.spawn(answer, s) for i in xrange(int(configs['maxClients']))]
    gevent.joinall(threads)
Run Code Online (Sandbox Code Playgroud)

没有改变。尽管如此,如果我运行多个客户端来连接到服务器,每个客户端都应该等待之前的客户端断开连接。也许我错过了什么。任何的想法?

编辑2:

我也尝试在主块中接受请求,正如@Paul Rooney 所说:

def answer(server_sock):
    try:
        gevent.sleep(1)
        while True:
            content = server_sock.recv(1024)
            counts['all_packets'] += 1
            log_file.write(packet_sep + content)
            resp = get_response()
            if resp:
                server_sock.send(resp)
    except:
         print 'An error occurred in connection with ', s_addr, '; quiting...'



if __name__ == '__main__':
    log_dir = sys.argv[2]
    log_file = initialize(sys.argv[2])
    config = sys.argv[1]
    configs = load_configs(config)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((str(configs['host']), int(configs['port'])))
    s.listen(int(configs['maxClients']))
    s_sock, s_addr = s.accept()
    print conn_sep + 'Receive a connection from ' + str(s_addr)
    threads = [gevent.spawn(answer, s_sock) for i in xrange(int(configs['maxClients']))]
    gevent.joinall(threads)
Run Code Online (Sandbox Code Playgroud)

首先,我对并发连接有相同的结果;在以前的客户死亡之前,不会回答任何请求。其次,当第一个客户端断开连接时,我在服务器中收到以下错误并终止:

Traceback (most recent call last):
  File "/opt/python2.7/lib/python2.7/site-packages/gevent-1.0.1-py2.7-linux-x86_64.egg/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "main.py", line 149, in answer
    server_sock.send(resp)
error: [Errno 32] Broken pipe
<Greenlet at 0x1e202d0: answer(<socket._socketobject object at 0x1dedad0>)> failed with error
Run Code Online (Sandbox Code Playgroud)

似乎当第一个客户端断开连接时,它关闭了它的套接字并且该套接字不再可用;所以其他连接的等待客户端无法再回答。

Pau*_*ney 5

在最简单的层面上,您可以做的是在每次accept调用返回时生成一个新进程,并将该进程传递给由 accept 返回的客户端套接字。

您有效地将请求的处理卸载到子进程,让主进程自由处理新请求,同样将它们卸载到新的子进程。

我发现这样做的方式,我并不是说它是完美的答案,但它对我有用(Debian Python 2.7.3)。

与您的原始代码有些相似的简单示例,仅用于演示何时生成进程。

import socket
import sys
import time
import errno
from multiprocessing import Process

ok_message = 'HTTP/1.0 200 OK\n\n'
nok_message = 'HTTP/1.0 404 NotFound\n\n'

def process_start(s_sock):

    content = s_sock.recv(32)
    s_sock.send(ok_message)
    s_sock.close()
    #time.sleep(10)
    sys.exit(0) # kill the child process

if __name__ == '__main__':
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((sys.argv[1], int(sys.argv[2])))
    print 'listen on address %s and port %d' % (sys.argv[1], int(sys.argv[2]))
    s.listen(1)
    try:
        while True:
            try:
                s_sock, s_addr = s.accept()
                p = Process(target=process_start, args=(s_sock,))
                p.start()

            except socket.error:
                # stop the client disconnect from killing us
                print 'got a socket error'

    except Exception as e:
        print 'an exception occurred!',
        print e
        sys.exit(1)
    finally:
        s.close()
Run Code Online (Sandbox Code Playgroud)

需要注意的是

s_sock, s_addr = s.accept()
p = Process(target=process_start, args=(s_sock,))
p.start()
Run Code Online (Sandbox Code Playgroud)

这里是您生成一个进程以响应accept返回的地方。

def process_start(s_sock):

    content = s_sock.recv(32)
    s_sock.send(ok_message)
    s_sock.close()
    #time.sleep(10)
    sys.exit(0) # kill the child process
Run Code Online (Sandbox Code Playgroud)

这是启动新进程的函数,获取传递给它的套接字并发送响应(你会在这里做更多的事情)。然后杀死孩子。我不是 100% 确定这是杀死子进程的正确方法,或者甚至需要杀死它。如果需要,也许有人可以纠正我或编辑答案。

我可以看到,即使我取消注释 time.sleep 调用,我也可以立即从多个客户端套接字获得响应。

就系统资源和性能而言,greenlets 方式无疑是一种更好的方式。