dan*_*451 5 python queue concurrency multiprocessing python-3.x
我有一些复杂的A类计算数据(大型矩阵计算),同时消耗B类的输入数据.
A本身使用多个核心.但是,当A需要下一个数据块时,它会等待相当长的一段时间,因为B在相同的主线程中运行.
由于A主要使用GPU进行计算,我希望B在CPU上同时收集数据.
我的最新方法是:
# every time *A* needs data
def some_computation_method(self):
data = B.get_data()
# start computations with data
Run Code Online (Sandbox Code Playgroud)
......和B看起来大致相同:
class B(object):
def __init__(self, ...):
...
self._queue = multiprocessing.Queue(10)
loader = multiprocessing.Process(target=self._concurrent_loader)
def _concurrent_loader(self):
while True:
if not self._queue.full():
# here: data loading from disk and pre-processing
# that requires access to instance variables
# like self.path, self.batch_size, ...
self._queue.put(data_chunk)
else:
# don't eat CPU time if A is too busy to consume
# the queue at the moment
time.sleep(1)
def get_data(self):
return self._queue.get()
Run Code Online (Sandbox Code Playgroud)
这种方法可以被视为"pythonic"解决方案吗?
由于我对Python的多处理模块没有多少经验,所以我构建了一个简单/简单的方法.然而,它对我来说看起来有点"hacky".
什么是更好的解决方案让B类同时从磁盘加载数据并通过某个队列提供它,而主线程运行繁重的计算并不时从队列中消耗数据?
虽然您的解决方案完全没问题,特别是对于“小型”项目,但它的缺点是线程与类紧密耦合B。因此,如果您(例如)出于某种原因想要B以非线程方式使用,那么您就不走运了。
我个人会以线程安全的方式编写该类,然后使用外部线程调用它:
class B(object):
def __init__(self):
self._queue = multiprocessing.Queue(10)
...
if __name__ == '__main__':
b = B()
loader = multiprocessing.Process(target=b._concurrent_loader)
loader.start()
Run Code Online (Sandbox Code Playgroud)
这使得B更加灵活,更好地分离依赖关系并且更容易测试。与在类创建时隐式发生的情况相比,它还通过显式地创建线程来使代码更具可读性。
| 归档时间: |
|
| 查看次数: |
826 次 |
| 最近记录: |