phi*_*686 8 python python-2.7 python-multiprocessing
所以我有一个生产者和消费者的系统通过无限大小的队列连接,但如果消费者反复调用get直到抛出Empty异常,它就不会清除队列.
我相信这是因为一旦套接字缓冲区已满,消费者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会一直等到缓冲区有空间,但是,对于消费者来说这是可能的调用get"太快",所以它认为队列是空的,实际上另一侧的线程有更多的数据要发送,但是不能足够快地序列化它以防止套接字对消费者显得空洞.
我相信如果我可以更改底层套接字上的缓冲区大小(我是基于Windows的),这个问题会得到缓解.据我所知,我需要做的是:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
Run Code Online (Sandbox Code Playgroud)
如果我执行上述操作,是否意味着当多重处理初始化队列时,它将使用我已在我已导入的multiprocessing.connections版本中设置的新缓冲区大小?那是对的吗?
另外我相信这只会影响windows,因为BUFSIZE不会在linux机器上使用,因为它们的所有套接字默认设置为60千字节?
有人曾尝试过这个吗?这会对窗户产生副作用吗?Windows上的套接字缓冲区大小有哪些基本限制?
===================一个代码示例来演示===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
Run Code Online (Sandbox Code Playgroud)
此示例在Windows上运行时,通常只会从队列中提取大约160,000个项目,因为主线程可以比供应商重新填充缓冲区更快地清空缓冲区,并最终在缓冲区为空时尝试从队列中提取报告说它是空的.
理论上,您可以通过使用更大的缓冲区来改善此问题.我认为,顶部的两行将在Windows系统上增加管道的默认缓冲区大小.
如果您对它们进行评论,那么此脚本将在退出之前提取更多数据,因为它具有更高的数据.我的主要问题是:1)这实际上是否有效.2)有没有办法让这段代码在windows和linux中使用相同大小的底层缓冲区3)为管道设置大缓冲区大小是否有任何意外的副作用.
我知道,一般来说,没有办法知道你是否从队列中提取了所有数据(假设供应商永久运行并且产生的数据非常不均匀),但我正在寻找方法来改进尽力而为.
更新:
Windows Pipe的有用链接,适用于将来需要它的人(该链接由OP,phil_20686提供):https: //msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v = vs.85)的.aspx
原著:
BUFSIZE仅在平台为win32时才有效.
multiprocessing.Queue构建在Pipe的顶部,如果更改BUFSIZE,则生成的Queue将使用更新的值.见下文:
class Queue(object):
def __init__(self, maxsize=0):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)
Run Code Online (Sandbox Code Playgroud)
当平台是win32时,管道代码将调用以下代码:
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = win32.PIPE_ACCESS_DUPLEX
access = win32.GENERIC_READ | win32.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = win32.PIPE_ACCESS_INBOUND
access = win32.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
address, openmode,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
Run Code Online (Sandbox Code Playgroud)
您可以看到,当duplexFalse时,outbuffer size为0且inbuffer size为BUFSIZE.
inbuffer是为输入缓冲区保留的字节数.2**16 = 65536,它是可以在一次操作中无阻塞地写入的最大字节数量,但缓冲区大小的容量因系统而异,即使它在同一系统上也会有所不同,因此很难说侧面设置管道最大量时的效果.
| 归档时间: |
|
| 查看次数: |
2013 次 |
| 最近记录: |