背景
我有两个需要相互通信的python进程.通信由名为Pipe的类处理.我为此创建了一个单独的类,因为需要传递的大多数信息都是以字典的形式出现的,因此Pipe实现了一个非常简单的协议.
这是Pipe构造函数:
def __init__(self,sPath):
"""
create the fifo. if it already exists just associate with it
"""
self.sPath = sPath
if not os.path.exists(sPath):
try:
os.mkfifo(sPath)
except:
raise Exception('cannot mkfifo at path \n {0}'.format(sPath))
self.iFH = os.open(sPath,os.O_RDWR | os.O_NONBLOCK)
self.iFHBlocking = os.open(sPath,os.O_RDWR)
Run Code Online (Sandbox Code Playgroud)
理想情况下,我只需要在每个进程中构建一个具有相同路径的管道,并且它们能够说得很好.
我将跳过有关协议的内容,因为我认为这在很大程度上是不必要的.
所有读写操作都使用以下"基本"功能:
def base_read_blocking(self,iLen):
self.lock()
lBytes = os.read(self.iFHBlocking,iLen)
self.unlock()
return lBytes
def base_read(self,iLen):
print('entering base read')
self.lock()
lBytes = os.read(self.iFH,iLen)
self.unlock()
print('exiting base read')
return lBytes
def base_write_blocking(self,lBytes):
self.lock()
safe_write(self.iFHBlocking,lBytes)
self.unlock()
def base_write(self,lBytes):
print('entering base write')
self.lock()
safe_write(self.iFH,lBytes)
self.unlock()
print('exiting base write')
Run Code Online (Sandbox Code Playgroud)
另一篇文章中提出了safe_write
def safe_write(*args, **kwargs):
while True:
try:
return os.write(*args, **kwargs)
except OSError as e:
if e.errno == 35:
import time
print(".")
time.sleep(0.5)
else:
raise
Run Code Online (Sandbox Code Playgroud)
锁定和解锁的处理方式如下:
def lock(self):
print('locking...')
while True:
try:
os.mkdir(self.get_lock_dir())
print('...locked')
return
except OSError as e:
if e.errno != 17:
raise e
def unlock(self):
try:
os.rmdir(self.get_lock_dir())
except OSError as e:
if e.errno != 2:
raise e
print('unlocked')
Run Code Online (Sandbox Code Playgroud)
问题
这有时会发生:
....in base_read
lBytes = os.read(self.iFH,iLen)
OSError: [Errno 11] Resource temporarily unavailable
Run Code Online (Sandbox Code Playgroud)
有时它很好.
神奇的解决方案
我似乎已经阻止了这个问题的发生.请注意,这不是我回答我自己的问题.我的问题将在下一节中解释.
我将读取函数更改为更像这样,并将其排序:
def base_read(self,iLen):
while not self.ready_for_reading():
import time
print('.')
time.sleep(0.5)
lBytes = ''.encode('utf-8')
while len(lBytes)<iLen:
self.lock()
try:
lBytes += os.read(self.iFH,iLen)
except OSError as e:
if e.errno == 11:
import time
print('.')
time.sleep(0.5)
finally:
self.unlock()
return lBytes
def ready_for_reading(self):
lR,lW,lX = select.select([self.iFH,],[],[],self.iTimeout)
if not lR:
return False
lR,lW,lX = select.select([self.iFHBlocking],[],[],self.iTimeout)
if not lR:
return False
return True
Run Code Online (Sandbox Code Playgroud)
问题
我正在努力找出它暂时无法使用的确切原因.由于锁定机制,这两个进程无法同时访问实际的命名管道(除非我弄错了?)所以这是由于我的程序没有考虑到的更为基础的fifos?
我真正想要的只是一个解释......我找到的解决方案有效,但看起来很神奇.任何人都可以提供解释吗?
系统
| 归档时间: |
|
| 查看次数: |
28261 次 |
| 最近记录: |