Älh*_*hoo 5 python multiprocessing
我正在尝试在 python3.3 中实现一个服务器,该服务器预加载了一个单独的线程来对传入连接进行所有处理。
from multiprocessing import Process, Pipe, Queue
from multiprocessing.reduction import reduce_socket
import time
import socketserver,socket
def process(q):
while 1:
fn,args = q.get()
conn = fn(*args)
while conn.recv(1, socket.MSG_PEEK):
buf = conn.recv(100)
if not buf: break
conn.send(b"Got it: ")
conn.send(buf)
conn.close()
class MyHandler(socketserver.BaseRequestHandler):
def handle(self):
print("Opening connection")
print("Processing")
self.server.q.put(reduce_socket(self.request))
while self.request.recv(1, socket.MSG_PEEK):
time.sleep(1)
print("Closing connection")
class MyServer(socketserver.ForkingTCPServer):
p = Process
q = Queue()
parent_conn,child_conn = Pipe()
def __init__(self,server_address,handler):
socketserver.ForkingTCPServer.__init__(self,server_address, handler)
self.p = Process(target=process,args=(self.q,))
self.p.start()
def __del__(self):
self.p.join()
server_address = ('',9999)
myserver = MyServer(server_address,MyHandler)
myserver.serve_forever()
Run Code Online (Sandbox Code Playgroud)
我可以使用以下脚本测试它是否有效:
from multiprocessing.reduction import reduce_socket
import time
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 9999))
time.sleep(1)
print("reduce_socket(s)")
fn,args = reduce_socket(s)
time.sleep(1)
print("rebuild_socket(s)")
conn = fn(*args)
time.sleep(1)
print("using_socket(s)")
conn.send("poks")
print conn.recv(255)
conn.send("poks")
print conn.recv(255)
conn.send("")
print conn.recv(255)
conn.close()
Run Code Online (Sandbox Code Playgroud)
不幸的是,似乎有一些问题,因为运行测试 n 次后,我的 tmp 文件夹充满了子文件夹:
$ ls /tmp/pymp*|wc -l
32000
Run Code Online (Sandbox Code Playgroud)
这些临时文件是由socket_reduce(). 有趣的是,rebuild/reduce_socket()客户端中也会创建临时文件,但一旦函数退出,它们就会被删除。我当前的 tmp 文件系统中的最大文件夹数量是 32000,这会导致问题。我可以手动或服务器中的某个位置删除 /tmp/pymp* 文件,但我想也应该有正确的方法来执行此操作。谁能帮我这个?
好的,有点修好了。来自 。./lib/python3.3/multiprocessing/util.py:
$ grep "def get_temp_dir" -B5 /usr/local/lib/python3.3/multiprocessing/util.py
#
# Function returning a temp directory which will be removed on exit
#
def get_temp_dir():
Run Code Online (Sandbox Code Playgroud)
看来临时目录应该可用,直到进程退出。由于 myprocess()和main()两者都会永远运行,因此临时文件不会被删除。为了解决这个问题,我可以创建另一个进程,将 returned_socket 交给process():
def process(q):
while 1:
fn,args = q.get()
conn = fn(*args)
while conn.recv(1, socket.MSG_PEEK):
buf = conn.recv(100)
if not buf: break
conn.send(b"Got it: ")
conn.send(buf)
conn.close()
q.put("ok")
class MyHandler(socketserver.BaseRequestHandler):
def socket_to_process(self,q):
q.put(reduce_socket(self.request))
q.get()
def handle(self):
p = Process(target=self.socket_to_process,args=(self.server.q,))
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud)
这样,临时文件是在子进程中创建的,一旦process()完成输入的操作,该子进程就会退出。我不认为这是一种优雅的做法,但它确实有效。如果有人更了解,请让 stackoverflow 知道。
| 归档时间: |
|
| 查看次数: |
5122 次 |
| 最近记录: |