zen*_*000 3 distributed-computing dask
我正在整理一个概念证明,其中我想使用 PyCuda 在分布式环境中处理大型字符数据文件(每个任务一个文件中约 8GB) - 具体来说是 AWS。我知道 HDFS 会对数据文件进行分段并将其分发给工作人员,但我试图让我的环境尽可能简单,并且如果我不需要,我宁愿不必安装 Hadoop。
我最近观看了 Continuum Analytics 的一些关于他们的 Dask 框架的网络研讨会,看起来它可以完全满足我的需求。鉴于上述段落和 Dask 框架,当前对文件系统的推荐是什么?我是坚持使用 HDFS 还是有更好/更简单的解决方案?
大多数文件系统都提供仅读取文件一部分的能力,包括 HDFS、您的本地文件系统和 S3(AWS 实例的标准批量数据存储)。这允许并行计算框架(如Dask)将大文件分割成许多较小的位,工作人员可以并行处理这些位。
对于大多数使用情况下,这是自动发生的幕后(用户read_text和read_csv需要刻意担心这一点。)这听起来像你有一个自定义的文件格式,所以我会带你到read_bytes的功能。对于 S3,其工作原理如下:
from dask.bytes import read_bytes
sample, partitions = read_bytes('s3://bucket/keys.*.foo',
blocksize=100000000)
Run Code Online (Sandbox Code Playgroud)
Sample 将是一个简短partitions的10kB数据样本,并将是一个dask.delayed对象列表,您可以将其与常规 for 循环一起使用以构建您的计算。
如果您的数据具有某种您希望 dask 尊重的分隔符,您可以使用delimiter=关键字参数提供它。
相同的功能适用于其他系统,例如您的本地文件系统或 HDFS(如果您已经安装并导入了hdfs3和distributed)。
sample, partitions = read_bytes('local://bucket/keys.*.foo', blocksize=100000000)
sample, partitions = read_bytes('hdfs://bucket/keys.*.foo')
Run Code Online (Sandbox Code Playgroud)
例如,这是我们如何实施的一个不正确但说明性的版本 dask.dataframe.read_csv
from dask import delayed
import pandas as pd
import dask.dataframe as dd
def read_csv(path, **kwargs):
sample, partitions = read_bytes(path, blocksize=100000000, delimiter=b'\n')
dataframes = [delayed(pd.read_csv)(part, **kwargs) for part in partitions]
return dd.from_delayed(dataframes)
Run Code Online (Sandbox Code Playgroud)
这是不正确的,因为pd.read_csv实际上想要一个 BytesIO 对象,我们没有稳健地处理关键字参数,并且我们没有很好地管理样本(列、dtypes 等)中的数据帧元数据。这些细节妨碍了一般观点虽然并且可能超出了这个问题的兴趣。
人们一直将这个问题作为对“如何从 S3 读取数据”这一更普遍问题的回答。大多数人不使用该read_bytes界面,这有点低级。相反,大多数用户可能希望使用以下高级功能之一:
import dask.bag as db
records = db.read_text('s3://bucket/keys.*.json').map(json.loads)
import dask.dataframe as dd
df = dd.read_csv('s3://bucket/keys.*.csv')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1627 次 |
| 最近记录: |