当我使用IPython.parallel时,如何强制远程内核重新启动?
例如,在普通的IPython笔记本中,我可以直接从工具栏重启内核.我的问题是当我有远程内核时,如何强制执行相同的操作?
我为Python 3安装了IPython 3,以便与Jupyterhub一起使用.
现在我可以使用带有Python2内核的笔记本,因为我创建了 /usr/local/share/jupyter/kernels/python2/kernel.json
有:
{
"argv": ["python2", "-m", "IPython.kernel",
"-f", "{connection_file}"],
"display_name": "Python 2",
"language": "python2"
}
Run Code Online (Sandbox Code Playgroud)
现在我也想使用IPython.parallel,但是当我启动一个集群时它将自动启动Python 3中的引擎,我该如何将其更改为Python 2?
我已将所有代码复制到我所有引擎计算机上的工作目录中.我的代码是:
my_test.py
my_startegy.py
main.py
Run Code Online (Sandbox Code Playgroud)
所以,main.py将继续运行Client Machine,代码main.py是:
from ipyparallel import Client
import my_test
import my_strategy as strategy
class Beck_Test_Parallel(object):
"""
"""
def __init__(self):
self.rc = None
self.dview = None
def start_client(self, path):
self.rc = Client(path)
self.dview = self.rc[:]
#self.dview.push(dict(
# Account=my_test.Account,
# dataImport=my_test.dataImport
# ))
def parallel_map(self, deal_function, accounts):
import my_test
return self.dview.map_sync(deal_function, accounts)
def create_accounts(time_list, account):
accounts = []
for index, time in enumerate(time_list):
acc = my_test.Account(
strategy.start,
strategy.end,
strategy.freq,
strategy.universe_code,
strategy.capital_base,
strategy.short_capital,
strategy.benchmark, …Run Code Online (Sandbox Code Playgroud) 有些文档分散在Jupyter或ipyparallel下,但没有一篇文章说明从头到尾的整个过程.我真的很困惑.谁有经验可以分享?
这是关于我的环境的检查.
$ipcluster --version
5.2.0
$which -a ipcluster
/home/etlolap/anaconda3/bin/ipcluster
/user/bin/cluster
$head -n 1 $(which ipcluster)
#!/home/etlolap/anaconda3/bin/python
Run Code Online (Sandbox Code Playgroud)
我可以看到Jupyter中的Clusters选项卡,但它仍然提示要求安装ipyparallel.

要在Jupyter Notebook中启用IPython Clusters选项卡:
ipcluster nbextension enable
Run Code Online (Sandbox Code Playgroud)
但是我得到了以下问题,因此没有用.看起来nbextension甚至不是一个有效的参数.
No subcommand specified. Must specify one of: ['start', 'stop', 'engines'].
Run Code Online (Sandbox Code Playgroud) 我正在尝试并行GridSearchCV化scikit-learn. 它在一个jupyter (hub) notebook环境中运行。经过一番研究,我发现了这段代码:
from sklearn.externals.joblib import Parallel, parallel_backend, register_parallel_backend
from ipyparallel import Client
from ipyparallel.joblib import IPythonParallelBackend
c = Client(profile='myprofile')
print(c.ids)
bview = c.load_balanced_view()
register_parallel_backend('ipyparallel', lambda : IPythonParallelBackend(view=bview))
grid = GridSearchCV(pipeline, cv=3, n_jobs=4, param_grid=param_grid)
with parallel_backend('ipyparallel'):
grid.fit(X_train, Y_train)
Run Code Online (Sandbox Code Playgroud)
请注意,我已将n_jobs参数设置为4,机器的 cpu 核心数是多少。(这就是nproc返回的结果)
但它似乎不起作用:ImportError: cannot import name 'register_parallel_backend',尽管我安装了 joblibconda install joblib并且也尝试过pip install -U joblib。
那么,在这种环境中并行化的最佳方法是什么GridSearchCV?
更新:
不带参数ipyparallel且仅设置n_jobs参数: …
python scikit-learn ipython-parallel jupyter jupyter-notebook
是否有可能将魔术命令%autoreload用于远程ipengines?
以下作品:
dv.execute('a=blah.test()')
dv.get('a')
5
Run Code Online (Sandbox Code Playgroud)
然后我改变等等回到8:
dv.execute('reload(blah)')
dv.execute('a=blah.test()')
dv.get('a')
8
Run Code Online (Sandbox Code Playgroud)
但我希望能做的是:
dv.execute('%load_ext autoreload')
dv.execute('%autoreload 2')
Run Code Online (Sandbox Code Playgroud)
并跳过重新加载步骤.但是,当我尝试此操作时,更改不会生效,并且blah.test()仍然使用旧版本的代码.为了确保我没有对execute命令做错,我也尝试使用magic函数:
%px %load_ext autoreload
%px %autoreload 2
Run Code Online (Sandbox Code Playgroud)
哪个仍然不给我自动加载功能.
谢谢.
我正在尝试将IPython Parallel用于一个非常常见的场景,我想在运行Sun Grid Engine的集群上运行模拟,我找不到可行的方法来执行此操作.
这是我想要做的:
我想用几个不同的参数值运行数值模拟(使用Numpy数组) - 这些任务显然是''令人尴尬'并行.我有权(通过ssh)访问运行Grid Engine的集群的头节点.直到现在,我使用QSUB命令运行shell脚本,但这非常笨拙(处理节点崩溃等),我正在寻找一种方法来解决所有这些问题.
IPython似乎非常适合这种情况,但是让设置顺利运行变得非常麻烦.我在头节点上使用IPCLUSTER启动n(比方说20个)引擎,然后将.json文件复制到我使用IPython.parallel.Client连接的本地机器.
我已 IPClusterStart.controller_launcher_class = 'SGEControllerLauncher'
和IPClusterEngines.engine_launcher_class = 'SGEEngineSetLauncher'
IPCLUSTER似乎运行良好; 我从ssh终端的头节点获得此输出:
-- [IPClusterStart] Starting Controller with SGEControllerLauncher
-- [IPClusterStart] Job submitted with job id: '143396'
-- [IPClusterStart] Starting 4 Engines with SGEEngineSetLauncher
-- [IPClusterStart] Job submitted with job id: '143397'
-- [IPClusterStart] Engines appear to have started successfully
Run Code Online (Sandbox Code Playgroud)
但是,我有这些问题:
很多时候,即使我看到上面的消息说发动机已成功启动,许多发动机也无法向控制器注册.当我用20个引擎启动IPCLUSTER时,我可以看到10-15个引擎出现在Grid Engine队列上.我不知道其他引擎会发生什么 - 没有输出文件.在这些10-15个引擎中,只有一些引用了控制器注册,我在输出文件中看到了这个:
... [IPEngineApp] Using existing profile dir: .../.ipython/profile_sge'
... [IPEngineApp] Loading url_file ... .ipython/profile_sge/security/ipcontroller-engine.json'
... [IPEngineApp] Registering with controller …Run Code Online (Sandbox Code Playgroud)我正在尝试在使用 IPython 并行时设置日志记录。具体来说,我想将日志消息从引擎重定向到客户端。因此,不是每个引擎都单独记录到自己的日志文件中,就像在IPython.parallel 中一样 - 我可以将自己的日志写入引擎日志吗?,我正在寻找类似如何在 Python 中使用多处理时进行日志记录?
基于查看 IPython 代码库,我的印象是这样做的方法是使用日志记录模块注册 zmq.log.hander.PUBHandler(请参阅 iploggerapp.py 中的文档)。我以各种方式尝试过这个,但似乎没有一个工作。我还尝试通过 IPython.parallel.util 注册一个记录器。connect_engine_logger,但这似乎也没有做任何事情。
更新
我在这个问题上取得了一些进展。如果我在 ipengine_config c.IPEngineApp.log_url 中指定,则 IPython 应用程序的记录器具有相应的 EnginePubHandler。我通过
%%px
from IPython.config import Application
log = Application.instance().log
print(log.handlers)
Run Code Online (Sandbox Code Playgroud)
这表明应用程序记录器对每个引擎都有一个 EnginePUBHandler。接下来,我可以在单独的终端中启动 iplogger 应用程序并查看来自每个引擎的日志消息。
但是,我想要实现的是在笔记本中而不是在单独的终端中查看这些日志消息。我曾尝试通过系统调用从笔记本中启动 iplogger,但它崩溃了。
我需要使用我的模型在 python 中批量和并行地进行预测。如果我加载模型并在常规 for 循环中创建数据框并使用 predict 函数,它就没有问题。如果我在 python 中使用 multiprocessing 并行创建不相交的数据帧,然后使用 predict 函数 for 循环无限期冻结。为什么会出现这种行为?
这是我的代码片段:
with open('models/model_test.pkl', 'rb') as fin:
pkl_bst = pickle.load(fin)
def predict_generator(X):
df = X
print(df.head())
df = (df.groupby(['user_id']).recommender_items.apply(flat_map)
.reset_index().drop('level_1', axis=1))
df.columns = ['user_id', 'product_id']
print('Merge Data')
user_lookup = pd.read_csv('data/user_lookup.csv')
product_lookup = pd.read_csv('data/product_lookup.csv')
product_map = dict(zip(product_lookup.product_id, product_lookup.name))
print(user_lookup.head())
df = pd.merge(df, user_lookup, on=['user_id'])
df = pd.merge(df, product_lookup, on=['product_id'])
df = df.sort_values(['user_id', 'product_id'])
users = df.user_id.values
items = df.product_id.values
df.drop(['user_id', 'product_id'], axis=1, inplace=True)
print('Prediction Step')
prediction = …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用MPI启动ipyparallel集群.
ipcluster_config具有如下修改的以下行:
c.MPILauncher.mpi_cmd = ['mpiexec']
c.MPIControllerLauncher.controller_args = ['--ip=*']
c.MPILauncher.mpi_args = ["-machinefile", "~/mpi_hosts"]
Run Code Online (Sandbox Code Playgroud)
ipcontroller_config.py配置如下:
c.HubFactory.engine_ip = '*'
c.HubFactory.ip = '*'
c.HubFactory.client_ip = '*'
Run Code Online (Sandbox Code Playgroud)
但是,当我使用命令启动集群时,
ipcluster start --profile mpi -n 2
它会失败并显示以下消息
Engines shutdown early, they probably failed to connect.
You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args
Run Code Online (Sandbox Code Playgroud)
不确定如何进一步调试.
ipython-parallel ×10
ipython ×6
python ×5
jupyter ×3
lightgbm ×1
logging ×1
mpi ×1
python-2.7 ×1
scikit-learn ×1