我有一些代码需要针对可能挂起或有不受我控制的问题的其他几个系统运行.我想使用python的多处理来生成子进程独立于主程序运行,然后当它们挂起或有问题终止它们时,但我不确定最好的方法来解决这个问题.
当终止被调用时,它确实会终止子进程,但它会变成一个已经失效的僵尸,直到进程对象消失才会被释放.循环永远不会结束的下面的示例代码可以杀死它并在再次调用时允许重新生成,但似乎不是一个很好的解决方法(即multiprocessing.Process()在__init __()中会更好.
有人有建议吗?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
Run Code Online (Sandbox Code Playgroud) 我试图在python中编写一个简单的工作证明nonce-finder.
def proof_of_work(b, nBytes):
nonce = 0
# while the first nBytes of hash(b + nonce) are not 0
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + 1
return nonce
Run Code Online (Sandbox Code Playgroud)
现在我尝试进行多处理,因此它可以使用所有CPU内核并更快地找到nonce.我的想法是multiprocessing.Pool多次使用和执行函数proof_of_work,传递两个参数num_of_cpus_running,this_cpu_id如下所示:
def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
nonce = this_cpu_id
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + num_of_cpus_running
return nonce
Run Code Online (Sandbox Code Playgroud)
所以,如果有4个核心,每个核心将计算这样的随机数:
core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, …Run Code Online (Sandbox Code Playgroud) 我想在子进程中限制资源访问.例如 - 限制http下载,磁盘io等.我如何实现它扩展这个基本代码?
请分享一些基本的代码示例.
pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
for job in job_queue.pull_jobs_for_processing:
pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud) 我试图以这种方式使用python的多处理包:
featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
for f in featureClass:
pool .apply_async(worker, args=f,callback=collectMyResult)
pool.close()
pool.join
Run Code Online (Sandbox Code Playgroud)
从池的进程我想避免等待超过60秒的那些返回其结果.那可能吗?
我通过HTTP连接到本地服务器(OSRM)以提交路由并返回驱动时间.我注意到I/O比线程慢,因为似乎计算的等待时间小于发送请求和处理JSON输出所花费的时间(我认为当服务器需要一些时间时I/O更好处理你的请求 - >你不希望它被阻止,因为你必须等待,这不是我的情况).线程受全局解释器锁的影响,因此我看来(以及下面的证据)我最快的选择是使用多处理.
多处理的问题是它太快以至于耗尽了我的套接字并且我收到了一个错误(请求每次都发出一个新的连接).我可以(在串行中)使用requests.Sessions()对象来保持连接活动,但是我不能让它并行工作(每个进程都有它自己的会话).
我目前最接近的代码就是这个多处理代码:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % …Run Code Online (Sandbox Code Playgroud) python concurrency multithreading asynchronous python-multiprocessing
我有一个使用Python multiprocessing模块生成多个工作人员的实用程序,我希望能够通过优秀的memory_profiler实用程序跟踪他们的内存使用情况,这可以完成我想要的一切 - 特别是随着时间的推移采样内存使用情况并绘制最终结果(I我不关心这个问题的逐行内存分析.
为了设置这个问题,我创建了一个更简单的脚本版本,它有一个worker函数,它分配的内存类似于库中给出的示例memory_profiler.工人如下:
import time
X6 = 10 ** 6
X7 = 10 ** 7
def worker(num, wait, amt=X6):
"""
A function that allocates memory over time.
"""
frame = []
for idx in range(num):
frame.extend([1] * amt)
time.sleep(wait)
del frame
Run Code Online (Sandbox Code Playgroud)
鉴于4名工人的顺序工作量如下:
if __name__ == '__main__':
worker(5, 5, X6)
worker(5, 2, X7)
worker(5, 5, X6)
worker(5, 2, X7)
Run Code Online (Sandbox Code Playgroud)
运行mprof可执行文件以配置我的脚本需要70秒,让每个工作程序一个接一个地运行.该脚本运行如下:
$ mprof run python myscript.py
Run Code Online (Sandbox Code Playgroud)
生成以下内存使用情况图:
让这些工作程序同时进行,multiprocessing意味着脚本的完成速度与最慢的工作程序一样慢(25秒).该脚本如下:
import multiprocessing …Run Code Online (Sandbox Code Playgroud) 我已经编写了一个神经网络分类器,该分类器可以获取海量图像(每张图像约1-3 GB),对其进行修补,然后分别将修补程序通过网络。培训的进行非常缓慢,因此我对其进行了基准测试,发现花大约50 秒钟的时间将补丁从一个图像加载到内存中(使用Openslide库),而仅需0.5秒钟就可以将它们通过模型。
但是,我正在使用具有1.5Tb RAM的超级计算机,其中仅使用了约26 Gb。数据集总计约500Gb。我的想法是,如果我们可以将整个数据集加载到内存中,它将极大地加快训练速度。但是我正在与一个研究团队合作,我们正在多个Python脚本之间进行实验。因此,理想情况下,我想将一个脚本中的整个数据集加载到内存中,并能够在所有脚本中对其进行访问。
更多细节:
.tif格式存储。我发现了很多关于如何在多个Python脚本之间共享Python对象或内存中的原始数据的文章:
多处理模块中具有SyncManager和BaseManager的服务器进程| 示例1 | 示例2 | Docs-服务器进程 | 文件-SyncManager
Manager对象在发送对象之前先对其进行腌制,这可能会使事情变慢。mmap将文件映射到虚拟内存,而不是物理内存 -它会创建一个临时文件。适用于Python 的sysv_ipc模块。这个演示看起来很有希望。
multi-processing模块中可用的功能?我还在Python中找到了IPC /网络选项列表。
有些人讨论服务器-客户端设置,有些人讨论序列化/反序列化,这恐怕会比从磁盘读取花费更多的时间。我找不到任何答案可以解决我的问题,这些答案是否会导致I / O性能的提高。
我们不仅需要在脚本之间共享Python对象/内存;我们需要在Docker容器之间共享它们。
Docker 文档--ipc很好地解释了该标志。根据文档的运行情况,对我来说有意义的是:
docker run …Run Code Online (Sandbox Code Playgroud) 我有一个multiprocessing程序在哪里
multiprocessing.Manager().list())我multiprocessing.Lock()在添加到列表或从中删除时实现了锁定(via ).由于有一个"馈线"过程和几个(10-40个)"消费者"过程都在争夺锁定,并且消费者流程很快,我最终得到的"馈线"过程很难获得锁定.
获得锁定时是否存在"优先级"的概念?我希望"馈线"流程能够比其他流程更优先获得它.
现在我通过让"消费者"进程在尝试获取锁定之前等待一段随机时间来缓解这个问题,而"馈送"进程就在那里(当它结束时设置一个标志).这是一个有效的解决方法,但它很丑陋并且几乎没有效果(我有进程等待random.random()*n几秒钟,n进程数量在哪里.这是一个完全编号,可能是错误的).
有一些令人恐惧的严格的API文档(读:ZERO)multiprocessing.pool.ApplyResult.该多解释DOC会谈约 ApplyResult S,但没有具体定义.
multiprocessing.pool.Pool虽然Python多处理指南似乎更好地涵盖了它,但这似乎也适用.
即使ApplyResult help()结果也很微不足道:
| get(self, timeout=None)
|
| ready(self)
|
| successful(self)
|
| wait(self, timeout=None)
Run Code Online (Sandbox Code Playgroud)
Get()而Ready()我得到的.那很好.
我完全不知道是什么wait(),因为你正在处理一个"池",人们会认为它会在get()通话中等待你.这是"等待结果,但现在不能得到它"或者它是OS风格的等待?如果是这样,那甚至意味着什么?
我同样不确定是什么successful().
我正在使用带有张量流的Keras作为后端.我有一个编译/训练的模型.
我的预测循环很慢,所以我想找到一种方法来并行化predict_proba调用以加快速度.我想获取批次(数据)列表,然后是每个可用的gpu,运行model.predict_proba()这些批次的子集.
实质上:
data = [ batch_0, batch_1, ... , batch_N ]
on gpu_0 => return predict_proba(batch_0)
on gpu_1 => return predict_proba(batch_1)
...
on gpu_N => return predict_proba(batch_N)
Run Code Online (Sandbox Code Playgroud)
我知道纯Tensorflow可以将ops分配给给定的gpu(https://www.tensorflow.org/tutorials/using_gpu).但是,我不知道这是如何转化为我的情况,因为我使用Keras的api构建/编译/训练了我的模型.
我曾经想过,也许我只需要使用python的多处理模块并开始运行每个gpu的进程predict_proba(batch_n).我知道这在理论上是可能的,因为我的另一个SO帖子:Keras + Tensorflow和Python中的多处理.然而,这仍然让我不知道如何实际"选择"一个gpu来操作这个过程.
我的问题归结为:当使用Tensorflow作为Keras的后端时,如何将Keras中的一个模型的预测与多个gpus并行化?
另外,我很好奇是否只有一个gpu可以进行类似的预测并行化.
高级描述或代码示例将不胜感激!
谢谢!
python ×10
asynchronous ×1
concurrency ×1
cryptography ×1
docker ×1
ipc ×1
keras ×1
nonce ×1
python-2.6 ×1
python-2.x ×1
python-3.x ×1
sha256 ×1
tensorflow ×1