我有一个简单的任务:
def worker(queue):
while True:
try:
_ = queue.get_nowait()
except Queue.Empty:
break
if __name__ == '__main__':
manager = multiprocessing.Manager()
# queue = multiprocessing.Queue()
queue = manager.Queue()
for i in range(5):
queue.put(i)
processes = []
for i in range(2):
proc = multiprocessing.Process(target=worker, args=(queue,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
Run Code Online (Sandbox Code Playgroud)
似乎multiprocessing.Queue可以完成我需要的所有工作,但另一方面我看到很多manager().Queue()的例子并且无法理解我真正需要的东西.看起来像Manager().Queue()使用某种代理对象,但我不明白这些目的,因为multiprocessing.Queue()在没有任何代理对象的情况下执行相同的工作.
所以,我的问题是:
1)multiprocessing.manager()返回的多处理.Queue和object之间有什么区别.Queue()?
2)我需要使用什么?
我正在使用带有Tensorflow的Keras作为后端.
我试图在我的主进程中保存模型,然后model.predict在另一个进程中加载/运行(即调用).
我目前只是尝试从文档中保存/加载模型的天真方法:https://keras.io/getting-started/faq/#how-can-i-save-a-keras-model.
所以基本上:
model.save() 在主要过程中model = load_model() 在儿童过程中model.predict() 在儿童过程中但是,它只是挂在load_model电话上.
搜索周围我发现了这个可能相关的答案,表明Keras只能在一个过程中使用:与theano一起使用多处理但不确定这是否属实(似乎在这方面似乎找不到太多).
有没有办法实现我的目标?非常感谢高级别描述或简短示例.
注意:我尝试了将图形传递给流程的方法但是失败了,因为似乎张流图不可选(这里有相关的SO帖子:Tensorflow:将会话传递给python多进程).如果确实有一种方法可以将张量流图/模型传递给子进程,那么我也对此持开放态度.
谢谢!
python neural-network python-multiprocessing keras tensorflow
我有一个反应器从RabbitMQ代理获取消息并触发工作方法在进程池中处理这些消息,如下所示:
这是使用python实现的asyncio,loop.run_in_executor()和concurrent.futures.ProcessPoolExecutor.
现在我想使用SQLAlchemy访问worker方法中的数据库.大多数情况下,处理将是非常简单和快速的CRUD操作.
反应器在开始时每秒处理10-50条消息,因此不能为每个请求打开新的数据库连接.相反,我想在每个进程中维护一个持久连接.
我的问题是:我怎么能这样做?我可以将它们存储在全局变量中吗?SQA连接池是否会为我处理这个问题?当反应堆停止时如何清理?
[更新]
为什么选择带有进程池的模式?
当前实现使用不同的模式,其中每个使用者在其自己的线程中运行.不知何故,这不是很好.已经有大约200个消费者在他们自己的线程中运行,并且系统正在快速增长.为了更好地扩展,我们的想法是分离关注点并在I/O循环中使用消息并将处理委托给池.当然,整个系统的性能主要是I/O绑定.但是,处理大型结果集时CPU是一个问题.
另一个原因是"易用性".虽然消息的连接处理和消耗是异步实现的,但是worker中的代码可以是同步且简单的.
很快,很明显,通过工作者内部的持久网络连接访问远程系统是一个问题.这就是CommunicationChannels的用途:在worker中,我可以通过这些通道向消息总线发出请求.
我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库.但是,我不知道如何使用SQLAlchemy执行此操作.入口点在哪里?对象需要pickled在它们通过队列时传递.如何从SQA查询中获取此类对象?与数据库的通信必须异步工作,以免阻塞事件循环.我可以使用例如aiomysql作为SQA的数据库驱动程序吗?
python sqlalchemy rabbitmq python-asyncio python-multiprocessing
有人告诉我,日志记录不能用于多处理.在多处理混淆日志的情况下,您必须执行并发控制.
但我做了一些测试,似乎在使用多处理登录时没有问题
import time
import logging
from multiprocessing import Process, current_process, pool
# setup log
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='/tmp/test.log',
filemode='w')
def func(the_time, logger):
proc = current_process()
while True:
if time.time() >= the_time:
logger.info('proc name %s id %s' % (proc.name, proc.pid))
return
if __name__ == '__main__':
the_time = time.time() + 5
for x in xrange(1, 10):
proc = Process(target=func, name=x, args=(the_time, logger))
proc.start()
Run Code Online (Sandbox Code Playgroud)
正如您从代码中看到的那样.
我故意让子进程在同一时刻(开始后5s)写日志以增加冲突的可能性.但是根本没有冲突.
所以我的问题是我们可以在多处理中使用日志记录吗?为什么这么多帖子说我们不能?
这是我第一次尝试并行处理,我一直在研究Dask,但实际编写它时遇到了麻烦.
我已经看过他们的示例和文档,我认为dask.delayed效果最好.我试图用延迟(function_name)包装我的函数,或者添加一个@delayed装饰器,但我似乎无法让它正常工作.我更喜欢Dask而不是其他方法,因为它是用python制作的,并且它(假设的)简单.我知道dask在for循环中不起作用,但是他们说它可以在循环中工作.
我的代码通过一个包含其他函数输入的函数传递文件,如下所示:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
Run Code Online (Sandbox Code Playgroud)
然后做一些预处理ex:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
Run Code Online (Sandbox Code Playgroud)
然后我调用一个构造函数并将pre_results传递给函数调用:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
Run Code Online (Sandbox Code Playgroud)
我在这里做的是将文件传递给for循环,进行一些预处理,然后将文件传递给两个模型.
关于如何并行化这个的想法或提示?我开始得到奇怪的错误,我不知道如何修复代码.代码确实有效.我使用了一堆pandas数据帧,系列和numpy数组,我宁愿不回去更改所有内容以使用dask.dataframes等.
我的评论中的代码可能难以阅读.这是一种更加格式化的方式.
在下面的代码中,当我输入print(mean_squared_error)时,我得到:延迟('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, …Run Code Online (Sandbox Code Playgroud) parallel-processing multithreading python-3.x python-multiprocessing dask
我已经根据numpy文档创建了numpy ndarray的子类.特别是,我通过修改提供的代码添加了自定义属性.
我正在使用Python在并行循环中操作此类的实例multiprocessing.据我所知,范围基本上被"复制"到多个线程的方式正在使用pickle.
我现在遇到的问题与numpy数组被腌制的方式有关.我找不到任何关于此的综合文档,但是莳萝开发人员之间的一些讨论表明我应该关注这个__reduce__方法,这个方法被称为酸洗.
任何人都可以对此有所了解吗?最小的工作示例实际上只是我上面链接的numpy示例代码,为了完整性而复制到这里:
import numpy as np
class RealisticInfoArray(np.ndarray):
def __new__(cls, input_array, info=None):
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = np.asarray(input_array).view(cls)
# add the new attribute to the created instance
obj.info = info
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for …Run Code Online (Sandbox Code Playgroud) 我已将程序(如下)写入:
pandas dataframegroupby使用特定列值拆分数据并存储为数据帧列表.multiprocess Pool.map()并行处理每个数据帧.一切都很好,该程序在我的小测试数据集上运行良好.但是,当我输入大数据(大约14 GB)时,内存消耗呈指数级增长,然后冻结计算机或被杀死(在HPC群集中).
一旦数据/变量无效,我就添加了代码来清除内存.一旦完成,我也正在关闭游泳池.仍然有14 GB的输入我只期望2*14 GB的内存负担,但似乎很多正在进行.我也尝试使用调整,chunkSize and maxTaskPerChild, etc但我没有看到测试与大文件的优化有任何区别.
我认为,当我开始时,在此代码位置需要对此代码进行改进multiprocessing.
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
但是,我发布了整个代码.
测试示例:我创建了一个高达250 mb的测试文件("genome_matrix_final-chr1234-1mb.txt")并运行该程序.当我检查系统监视器时,我可以看到内存消耗增加了大约6 GB.我不太清楚为什么250 mb文件加上一些输出需要这么大的内存空间.如果它有助于查看真正的问题,我通过下拉框共享该文件.https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
有人可以建议,我怎么能摆脱这个问题?
我的python脚本:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or …Run Code Online (Sandbox Code Playgroud) 我正在使用python的多处理模块来生成新进程
如下 :
import multiprocessing
import os
d = multiprocessing.Process(target=os.system,args=('iostat 2 > a.txt',))
d.start()
Run Code Online (Sandbox Code Playgroud)
我想获取iostat命令的pid或使用多处理模块执行的命令
当我执行:
d.pid
Run Code Online (Sandbox Code Playgroud)
它给了我运行此命令的子shell的pid.
任何帮助都是有价值的.
提前致谢
我正在开发一个应用程序,它使用LevelDB并使用多个长期存在的进程来执行不同的任务.
由于LevelDB只允许单个进程维护数据库连接,因此我们所有的数据库访问都通过特殊的数据库进程进行汇集.
要从另一个进程访问数据库,我们使用a BaseProxy.但是因为我们使用asyncio我们的代理不应该阻塞这些调用db进程然后最终从db读取的API.因此,我们使用执行程序在代理上实现API.
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
thread_pool_executor,
self._callmethod,
method_name,
args,
)
Run Code Online (Sandbox Code Playgroud)
尽管这工作得很好,我不知道是否有到包裹一个更好的选择_callmethod的通话BaseProxy中ThreadPoolExecutor.
我理解它的方式,BaseProxy调用DB进程是等待IO的教科书示例,因此使用线程这似乎是不必要的浪费.
在一个完美的世界中,我假设async _acallmethod存在,BaseProxy但不幸的是,API不存在.
因此,我的问题基本归结为:在使用时,BaseProxy是否有更有效的替代方法来运行这些跨进程调用ThreadPoolExecutor?
我正在阅读python 文档中对两者的描述:
产卵
父进程启动一个新的 python 解释器进程。子进程将只继承运行进程对象 run() 方法所需的资源。特别是,父进程中不必要的文件描述符和句柄将不会被继承。与使用 fork 或 forkserver 相比,使用此方法启动进程相当慢。[在 Unix 和 Windows 上可用。Windows 和 macOS 上的默认设置。]
叉子
父进程使用 os.fork() 来派生 Python 解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地分叉多线程进程是有问题的。[仅在 Unix 上可用。Unix 上的默认设置。]
我的问题是: