作为数据工作流的一部分,我需要修改 dask 数据框列子集中的值并传递结果以进行进一步计算。特别是,我对两种情况感兴趣:映射列和映射分区。对数据采取行动的推荐安全且高效的方式是什么?我在每个主机上有多个工作进程的集群上运行分布式设置。
情况1。
我想跑:
res = dataframe.column.map(func, ...)
Run Code Online (Sandbox Code Playgroud)
这将返回一个数据系列,所以我假设原始数据帧没有被修改。将一列分配回数据框是否安全,例如dataframe['column']=res?可能不是。我应该使用.copy()制作一个副本,然后将结果分配给它,例如:
dataframe2 = dataframe.copy()
dataframe2['column'] = dataframe.column.map(func, ...)
Run Code Online (Sandbox Code Playgroud)
还有其他推荐的方法吗?
案例2
我需要映射数据帧的分区:
df.map_partitions(mapping_func, meta=df)
Run Code Online (Sandbox Code Playgroud)
在mapping_func() 中,我想通过使用partition[column].map或简单地创建列表理解来修改所选列中的值。同样,如何安全地修改分区并从映射函数中返回它?
映射函数收到的分区是 Pandas 数据帧(原始数据的副本?),但是在就地修改数据时,我看到了一些崩溃(尽管没有异常/错误消息)。调用也是一样partition.copy(deep=False),它不起作用。分区应该被深度复制然后就地修改吗?或者我应该总是从新的/映射的列数据和原始/未修改的系列/列中构建一个新的数据框?
[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786
distributed.nanny - INFO - Start Nanny at: 'tcp://172.26.32.36:50930'
distributed.diskutils - WARNING - Found stale lock file and directory '/home/mapr/latest_code_deepak/dask-worker-space/worker-PwEseH', purging
distributed.worker - INFO - Start worker at: tcp://172.26.32.36:41694
distributed.worker - INFO - Listening to: tcp://172.26.32.36:41694
distributed.worker - INFO - bokeh at: 172.26.32.36:8789
distributed.worker - INFO - nanny at: 172.26.32.36:50930
distributed.worker - INFO - Waiting to connect to: tcp://172.26.32.37:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 8
distributed.worker - INFO - Memory: 33.52 GB …Run Code Online (Sandbox Code Playgroud) 我有一个较大的对象 (150 MB),我需要向所有 dask 分布式工作人员广播它,以便它可以在未来的任务中使用。我尝试了几种方法:
Client.scatter(broadcast=True):这需要从一台机器(我在其中运行客户端和调度程序)发送所有数据,这会造成带宽瓶颈。Client.submit其次是Client.replicate:这些工作人员共享一个文件系统,因此我可以安排加载数据的任务,然后将数据复制到所有工作人员,而不是发送数据。这似乎使用树策略来分发数据,这比之前的选项更快。但是,强制每个工作人员在本地运行加载数据功能可能会更快,而不是在一个工作人员上加载数据并将其序列化到工作人员之间。有没有办法做到这一点? Client.run似乎是我想要的一部分,但我需要为加载的数据取回一个未来,我可以稍后将其传递给其他任务。
我正在尝试将Wikipedia CirrusSearch 转储转换为由 450G 16 核 GCP 实例上的标题索引的 Parquet 支持的 dask 数据帧。CirrusSearch 转储以单个 json 行格式文件的形式出现。英文 Wipedia 转储包含 500 万个 recard,压缩为 12G,扩展为 90+G。一个重要的细节是记录并不完全平坦。
最简单的方法是
import json
import dask
from dask import bag as db, dataframe as ddf
from toolz import curried as tz
from toolz.curried import operator as op
blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')
lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'
source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'
(
db
.read_text(source, blocksize=blocksize)
.map(json.loads)
.filter(tz.flip(op.contains, 'title'))
.to_dataframe()
.set_index('title', npartitions=npartitions)
.to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
) …Run Code Online (Sandbox Code Playgroud) 我有一大组 sklearn 管道,我想与 Dask 并行构建。这是一个简单但幼稚的顺序方法:
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)
pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])
pipelines = [pipe_nb, pipe_lr, pipe_rf] # In reality, this would include many more different types of models with varying but specific parameters
for pl in pipelines:
pl.fit(X_train, Y_train) …Run Code Online (Sandbox Code Playgroud) import dask.distributed
def f(x, y):
return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
Run Code Online (Sandbox Code Playgroud)
不起作用。
[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
<Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]
distributed.worker - WARNING - Compute Failed
Function: f
args: ((1, 2))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
distributed.worker - WARNING - Compute Failed
Function: f
args: ((2, 3))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
Run Code Online (Sandbox Code Playgroud) 我正在使用最轻量/简单的dask多处理,即非集群本地Client:
from distributed import Client
client = Client()
Run Code Online (Sandbox Code Playgroud)
即便如此:第一个调用实例会产生dask.bag.compute()以下结果:
Connected to pydev debugger (build 191.7141.48)
Traceback (most recent call last):
File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
r = self.sock.recv(1024)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
r = self.sock.recv(1024)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
r = self.sock.recv(1024)
OSError: [Errno 9] Bad file …Run Code Online (Sandbox Code Playgroud) 我有一个local_code.py文件中的函数,我想通过 dask 传递给工作人员。我在这里看到问题的答案说这可以使用该upload_file()函数来完成,但我似乎无法让它工作,因为我仍然得到一个ModuleNotFoundError.
代码的相关部分如下。
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from local_code import *
helper_file = '/absolute/path/to/local_code.py'
def main():
with SLURMCluster(**slurm_params) as cluster:
cluster.scale(n_workers)
with Client(cluster) as client:
client.upload_file(helper_file)
mapping = client.map(myfunc, data)
client.gather(mapping)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
请注意,myfunc是从 导入的local_code,并且将其导入到地图没有错误。该函数myfunc还依赖于 中定义的其他函数local_code。
使用此代码,我仍然收到此错误
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x11local_code\x94\x8c\x$
Traceback (most recent call last):
File "/home/gallagher.r/.local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
return pickle.loads(x) …Run Code Online (Sandbox Code Playgroud) 有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立的)数据的操作提交到dask. 我的主要问题是:可以dask保存不断提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?
但是让我们用一个例子来解释它:
创建一个dask_server.py:
from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'
def run_cluster():
cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
client = Client(cluster)
print(client)
print("Press Enter to quit ...")
input()
if __name__ == '__main__':
run_cluster()
Run Code Online (Sandbox Code Playgroud)
现在,我可以从我的连接my_stream.py,并开始submit和gather数据:
DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client …Run Code Online (Sandbox Code Playgroud) 我有一个 Docker 应用程序,它在 Windows 上的笔记本电脑中运行良好,使用组合和启动容器的多个实例作为 Dask 集群。
服务的名称是“worker”,我启动了两个容器实例,如下所示:
docker compose up --scale worker=2
Run Code Online (Sandbox Code Playgroud)
我在 Azure 上部署了映像,当我运行 docker compose(使用我在 Windows 中使用的相同命令)时,只启动了一个容器。
如何在 Azure 中部署容器集群?我可以使用 docker compose 还是我需要采用不同的方法,例如使用模板或 Kubernetes 进行部署?
这是docker-compose.yml文件:
version: "3.0"
services:
web:
image: sofacr.azurecr.io/pablo:job2_v1
volumes:
- daskvol:/code/defaults_prediction
ports:
- "5000:5000"
environment:
- SCHEDULER_ADDRESS=scheduler
- SCHEDULER_PORT=8786
working_dir: /code
entrypoint:
- /opt/conda/bin/waitress-serve
command:
- --port=5000
- defaults_prediction:app
scheduler:
image: sofacr.azurecr.io/pablo:job2_v1
ports:
- "8787:8787"
entrypoint:
- /opt/conda/bin/dask-scheduler
worker:
image: sofacr.azurecr.io/pablo:job2_v1
depends_on:
- scheduler
environment:
- PYTHONPATH=/code
- SCHEDULER_ADDRESS=scheduler …Run Code Online (Sandbox Code Playgroud) dask ×10
dask-distributed ×10
python ×3
azure ×1
dask-delayed ×1
docker ×1
fastparquet ×1
parquet ×1
pycharm ×1
python-3.x ×1
scikit-learn ×1
streamz ×1