标签: dask

如何在一台机器上使用所有内核的Pandas Dataframes并行化apply()?

截至2017年8月,不幸的是,Pandas DataFame.apply()仅限于使用单核,这意味着多核机器在运行时将浪费大部分计算时间df.apply(myfunc, axis=1).

如何使用所有核心并行运行应用于数据帧?

pandas dask

68
推荐指数
6
解决办法
3万
查看次数

在什么情况下我可以使用Dask而不是Apache Spark?

我目前正在使用Pandas和Spark进行数据分析.我发现Dask提供了并行化的NumPy数组和Pandas DataFrame.

Pandas在Python中进行数据分析非常简单直观.但由于系统内存有限,我发现难以在Pandas中处理多个更大的数据帧.

简单回答:

Apache Spark是一个包含分布式计算,SQL查询,机器学习等在JVM上运行的全包框架,通常与Hadoop等其他大数据框架共同部署....通常Dask比Spark更小,重量更轻.

我从http://dask.pydata.org/en/latest/spark.html了解下面的详细信息

  • Dask重量轻
  • Dask通常在单个计算机上使用,但也可以在分布式群集上运行良好.
  • Dask提供并行数组,数据帧,机器学习和自定义算法
  • Dask对Python用户有一个优势,因为它本身就是一个Python库,因此当出现问题时进行序列化和调试会更顺利.
  • Dask放弃了高级别的理解,允许用户表达更复杂的并行算法.
  • Dask重量更轻,更易于集成到现有代码和硬件中.
  • 如果你想要一个可以完成所有事情并且你已经在大数据硬件上的项目,那么Spark是一个安全的选择
  • Spark通常用于中小型集群,但也可在单台机器上运行良好.

我从以下链接了解有关Dask的更多信息 https://www.continuum.io/blog/developer-blog/high-performance-hadoop-anaconda-and-dask-your-cluster

  • 如果您在使用Pandas,NumPy或其他使用Python的计算时遇到内存问题,存储限制或单个计算机上的CPU边界,Dask可以帮助您扩展单个计算机上的所有核心,或者向外扩展在群集中的所有核心和内存上.
  • Dask在一台机器上运行良好,可以利用笔记本电脑上的所有内核并处理大于内存的数据
  • 在具有数百个节点的群集上弹性地弹性扩展.
  • Dask使用Python本地工作,具有不同格式和存储系统的数据,包括Hadoop分布式文件系统(HDFS)和Amazon S3.Anaconda和Dask可以与您现有的企业Hadoop发行版配合使用,包括Cloudera CDH和Hortonworks HDP.

http://dask.pydata.org/en/latest/dataframe-overview.html

限制

Dask.DataFrame不实现整个Pandas接口.期望这样的用户会感到失望.但是,dask.dataframe有以下限制:

  1. 从未排序的列设置新索引非常昂贵
  2. 许多操作,例如groupby-apply和join on unsorted columns,需要设置索引,如上所述,索引很昂贵
  3. Pandas API非常庞大.Dask.dataframe不会尝试实现许多pandas功能或任何更奇特的数据结构,如NDFrame

感谢Dask开发人员.这似乎是非常有前途的技术.

总的来说,我可以理解Dask比spark更容易使用.Dask与Pandas一样灵活,具有更大的计算能力和更多的CPU.

我理解关于Dask的所有上述事实.

那么,使用Dask大致可以处理多少数据量(以TB为单位)?

python bigdata pandas apache-spark dask

63
推荐指数
1
解决办法
2万
查看次数

稀疏CSR阵列的核外处理

如何在使用Python保存在磁盘上的稀疏CSR数组的块上并行应用某些函数?顺序地,这可以例如通过保存CSR阵列并且joblib.dump打开它joblib.load(.., mmap_mode="r")并逐个处理行的块来完成.使用dask可以更有效地完成这项工作吗?

特别是,假设一个人不需要在稀疏数组上完成所有可能的核心操作,而只需要并行加载行块(每个块是一个CSR数组)并对它们应用一些函数(在我的情况下它会例如estimator.predict(X)来自scikit-learn).

此外,磁盘上是否有适合此任务的文件格式?Joblib有效,但我不确定作为内存映射加载的CSR数组的(并行)性能; spark.mllib似乎使用一些自定义稀疏存储格式(似乎没有纯Python解析器)或LIBSVM格式(根据我的经验,scikit-learn中的解析器比它慢得多joblib.dump)...

注意:我在https://github.com/dask/dask/上阅读了文档,有关它的各种问题,但我仍然不确定如何最好地解决这个问题.

编辑:为了给出一个更实际的例子,下面是在密码数组的dask中工作的代码,但在使用带有此错误的稀疏数组时失败,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000)) …
Run Code Online (Sandbox Code Playgroud)

python scipy joblib dask apache-spark-mllib

39
推荐指数
1
解决办法
1320
查看次数

python dask DataFrame,支持(平凡可并行化)行适用?

我最近发现dask模块旨在成为一个易于使用的python并行处理模块.对我来说最大的卖点是它与熊猫一起使用.

在其手册页上阅读了一下之后,我找不到办法完成这个简单的可并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)

目前,要在dask,AFAIK实现这一目标,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
Run Code Online (Sandbox Code Playgroud)

这是一种丑陋的语法,实际上比直接慢

df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)

有什么建议吗?

编辑:感谢@MRocklin的地图功能.它似乎比普通的大熊猫慢.这是与熊猫GIL发布问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing pandas dask

36
推荐指数
1
解决办法
1万
查看次数

以高效的内存方式将大型csv读入稀疏的pandas数据帧

pandas read_csv函数似乎没有稀疏选项.我有csv数据,其中有大量零(它压缩得非常好,并且剥离任何0值都会将其减少到几乎原始大小的一半).

我已经尝试将它加载到密集矩阵中read_csv然后调用to_sparse,但是它需要很长时间并且在文本字段上窒息,尽管大多数数据都是浮点数.如果我pandas.get_dummies(df)先调用将分类列转换为1和0,那么调用to_sparse(fill_value=0)它需要花费大量时间,比我预期的大得多的数字表有更长的时间,大多数为零.即使我从原始文件中删除零并调用to_sparse()(以使填充值为NaN),也会发生这种情况.这也恰好不管我是否通过kind='block'kind='integer'.

除了手工构建稀疏数据帧之外,是否有一种好的,平滑的方式可以直接加载稀疏的csv而不会占用大量不必要的内存?


下面是一些代码,用于创建具有3列浮点数据和一列文本数据的样本数据集.大约85%的浮点值为零,CSV的总大小约为300 MB,但您可能希望将其放大以真正测试内存约束.

np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)
Run Code Online (Sandbox Code Playgroud)

这是一种简单的阅读方式,但希望有更好,更有效的方法:

sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)
Run Code Online (Sandbox Code Playgroud)

编辑添加(来自JohnE): 如果可能,请在答案中提供有关读取大型CSV的一些相对性能统计数据,包括有关如何测量内存效率的信息(特别是因为内存效率比时钟时间更难测量).特别要注意的是,如果内存效率更高,那么较慢的(时钟时间)答案可能是最佳答案.

python numpy scipy pandas dask

24
推荐指数
2
解决办法
3770
查看次数

Dask:如何将我的代码与dask延迟并行化?

这是我第一次尝试并行处理,我一直在研究Dask,但实际编写它时遇到了麻烦.

我已经看过他们的示例和文档,我认为dask.delayed效果最好.我试图用延迟(function_name)包装我的函数,或者添加一个@delayed装饰器,但我似乎无法让它正常工作.我更喜欢Dask而不是其他方法,因为它是用python制作的,并且它(假设的)简单.我知道dask在for循环中不起作用,但是他们说它可以在循环中工作.

我的代码通过一个包含其他函数输入的函数传递文件,如下所示:

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
    name = name.split('.')[0]
    ....
Run Code Online (Sandbox Code Playgroud)

然后做一些预处理ex:

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
Run Code Online (Sandbox Code Playgroud)

然后我调用一个构造函数并将pre_results传递给函数调用:

    fc = FunctionCalls()
    Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                             input_data=pre_result1, model1=pre_result2)
Run Code Online (Sandbox Code Playgroud)

我在这里做的是将文件传递给for循环,进行一些预处理,然后将文件传递给两个模型.

关于如何并行化这个的想法或提示?我开始得到奇怪的错误,我不知道如何修复代码.代码确实有效.我使用了一堆pandas数据帧,系列和numpy数组,我宁愿不回去更改所有内容以使用dask.dataframes等.

我的评论中的代码可能难以阅读.这是一种更加格式化的方式.

在下面的代码中,当我输入print(mean_squared_error)时,我得到:延迟('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = delayed(mse)(observed, …
Run Code Online (Sandbox Code Playgroud)

parallel-processing multithreading python-3.x python-multiprocessing dask

21
推荐指数
2
解决办法
9296
查看次数

如何在Pandas中使用apply来并行化许多(模糊)字符串比较?

我有以下问题

我有一个包含句子的数据框主文件,例如

master
Out[8]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice
Run Code Online (Sandbox Code Playgroud)

对于Master中的每一行,我使用查找到另一个Dataframe 从站以获得最佳匹配fuzzywuzzy.我使用fuzzywuzzy,因为两个数据帧之间的匹配句子可能有点不同(额外的字符等).

例如,奴隶可能是

slave
Out[10]: 
   my_value                      name
0         2               hello world
1         1           congratulations
2         2  this is a nice sentence 
3         3       this is another one
4         1     stackoverflow is nice
Run Code Online (Sandbox Code Playgroud)

这是一个功能齐全,精彩,紧凑的工作示例:)

from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib


master= pd.DataFrame({'original':['this is a nice sentence', …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing pandas fuzzywuzzy dask

19
推荐指数
1
解决办法
6347
查看次数

将Dask分区写入单个文件

新的dask,1GB当我在dask数据帧中读取它时,我有一个CSV文件,它在我写入文件的更改后创建了大约50个分区,它创建了与分区一样多的文件.
有没有办法将所有分区写入单个CSV文件,是否有办法访问分区?
谢谢.

python dask

19
推荐指数
2
解决办法
5927
查看次数

将Pandas数据帧转换为Dask数据帧

假设我有pandas数据帧:

df=pd.DataFrame({'a':[1,2,3],'b':[4,5,6]})
Run Code Online (Sandbox Code Playgroud)

当我将它转换为dask数据帧时应该namedivisions参数包括:

from dask import dataframe as dd 
sd=dd.DataFrame(df.to_dict(),divisions=1,meta=pd.DataFrame(columns=df.columns,index=df.index))
Run Code Online (Sandbox Code Playgroud)

TypeError:init()缺少1个必需的位置参数:'name'

编辑:假设我创建了一个pandas数据框,如:

pd.DataFrame({'a':[1,2,3],'b':[4,5,6]})
Run Code Online (Sandbox Code Playgroud)

同样如何创建DASK数据帧,因为它需要三个额外的参数作为name,divisionsmeta.

sd=dd.Dataframe({'a':[1,2,3],'b':[4,5,6]},name=,meta=,divisions=)
Run Code Online (Sandbox Code Playgroud)

谢谢您的回复.

python data-conversion dataframe pandas dask

19
推荐指数
1
解决办法
2万
查看次数

将函数应用于Dask中的分组数据框:如何在函数中将分组的Dataframe指定为参数?

我有一个dask dataframe按索引(first_name)分组.

import pandas as pd
import numpy as np

from multiprocessing import cpu_count

from dask import dataframe as dd
from dask.multiprocessing import get 
from dask.distributed import Client


NCORES = cpu_count()
client = Client()

entities = pd.DataFrame({'first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13']})

df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))
Run Code Online (Sandbox Code Playgroud)

(显然entities在现实生活中有几千行)

我想将用户定义的函数应用于每个分组的数据帧.我想将每一行与组中的所有其他行进行比较(类似于Pandas将每行与数据帧中的所有行进行比较,并将结果保存在每行的列表中).

以下是我尝试应用的功能:

def contraster(x, DF):
    matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis …
Run Code Online (Sandbox Code Playgroud)

python pandas dask

19
推荐指数
2
解决办法
1159
查看次数