截至2017年8月,不幸的是,Pandas DataFame.apply()仅限于使用单核,这意味着多核机器在运行时将浪费大部分计算时间df.apply(myfunc, axis=1).
如何使用所有核心并行运行应用于数据帧?
我目前正在使用Pandas和Spark进行数据分析.我发现Dask提供了并行化的NumPy数组和Pandas DataFrame.
Pandas在Python中进行数据分析非常简单直观.但由于系统内存有限,我发现难以在Pandas中处理多个更大的数据帧.
简单回答:
Apache Spark是一个包含分布式计算,SQL查询,机器学习等在JVM上运行的全包框架,通常与Hadoop等其他大数据框架共同部署....通常Dask比Spark更小,重量更轻.
我从http://dask.pydata.org/en/latest/spark.html了解下面的详细信息
我从以下链接了解有关Dask的更多信息 https://www.continuum.io/blog/developer-blog/high-performance-hadoop-anaconda-and-dask-your-cluster
http://dask.pydata.org/en/latest/dataframe-overview.html
限制
Dask.DataFrame不实现整个Pandas接口.期望这样的用户会感到失望.但是,dask.dataframe有以下限制:
感谢Dask开发人员.这似乎是非常有前途的技术.
总的来说,我可以理解Dask比spark更容易使用.Dask与Pandas一样灵活,具有更大的计算能力和更多的CPU.
我理解关于Dask的所有上述事实.
那么,使用Dask大致可以处理多少数据量(以TB为单位)?
如何在使用Python保存在磁盘上的稀疏CSR数组的块上并行应用某些函数?顺序地,这可以例如通过保存CSR阵列并且joblib.dump打开它joblib.load(.., mmap_mode="r")并逐个处理行的块来完成.使用dask可以更有效地完成这项工作吗?
特别是,假设一个人不需要在稀疏数组上完成所有可能的核心操作,而只需要并行加载行块(每个块是一个CSR数组)并对它们应用一些函数(在我的情况下它会例如estimator.predict(X)来自scikit-learn).
此外,磁盘上是否有适合此任务的文件格式?Joblib有效,但我不确定作为内存映射加载的CSR数组的(并行)性能; spark.mllib似乎使用一些自定义稀疏存储格式(似乎没有纯Python解析器)或LIBSVM格式(根据我的经验,scikit-learn中的解析器比它慢得多joblib.dump)...
注意:我在https://github.com/dask/dask/上阅读了文档,有关它的各种问题,但我仍然不确定如何最好地解决这个问题.
编辑:为了给出一个更实际的例子,下面是在密码数组的dask中工作的代码,但在使用带有此错误的稀疏数组时失败,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000)) …Run Code Online (Sandbox Code Playgroud) 我最近发现dask模块旨在成为一个易于使用的python并行处理模块.对我来说最大的卖点是它与熊猫一起使用.
在其手册页上阅读了一下之后,我找不到办法完成这个简单的可并行化任务:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)
目前,要在dask,AFAIK实现这一目标,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
Run Code Online (Sandbox Code Playgroud)
这是一种丑陋的语法,实际上比直接慢
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)
有什么建议吗?
编辑:感谢@MRocklin的地图功能.它似乎比普通的大熊猫慢.这是与熊猫GIL发布问题有关还是我做错了?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s …Run Code Online (Sandbox Code Playgroud) pandas read_csv函数似乎没有稀疏选项.我有csv数据,其中有大量零(它压缩得非常好,并且剥离任何0值都会将其减少到几乎原始大小的一半).
我已经尝试将它加载到密集矩阵中read_csv然后调用to_sparse,但是它需要很长时间并且在文本字段上窒息,尽管大多数数据都是浮点数.如果我pandas.get_dummies(df)先调用将分类列转换为1和0,那么调用to_sparse(fill_value=0)它需要花费大量时间,比我预期的大得多的数字表有更长的时间,大多数为零.即使我从原始文件中删除零并调用to_sparse()(以使填充值为NaN),也会发生这种情况.这也恰好不管我是否通过kind='block'或kind='integer'.
除了手工构建稀疏数据帧之外,是否有一种好的,平滑的方式可以直接加载稀疏的csv而不会占用大量不必要的内存?
下面是一些代码,用于创建具有3列浮点数据和一列文本数据的样本数据集.大约85%的浮点值为零,CSV的总大小约为300 MB,但您可能希望将其放大以真正测试内存约束.
np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)
Run Code Online (Sandbox Code Playgroud)
这是一种简单的阅读方式,但希望有更好,更有效的方法:
sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)
Run Code Online (Sandbox Code Playgroud)
编辑添加(来自JohnE): 如果可能,请在答案中提供有关读取大型CSV的一些相对性能统计数据,包括有关如何测量内存效率的信息(特别是因为内存效率比时钟时间更难测量).特别要注意的是,如果内存效率更高,那么较慢的(时钟时间)答案可能是最佳答案.
这是我第一次尝试并行处理,我一直在研究Dask,但实际编写它时遇到了麻烦.
我已经看过他们的示例和文档,我认为dask.delayed效果最好.我试图用延迟(function_name)包装我的函数,或者添加一个@delayed装饰器,但我似乎无法让它正常工作.我更喜欢Dask而不是其他方法,因为它是用python制作的,并且它(假设的)简单.我知道dask在for循环中不起作用,但是他们说它可以在循环中工作.
我的代码通过一个包含其他函数输入的函数传递文件,如下所示:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
Run Code Online (Sandbox Code Playgroud)
然后做一些预处理ex:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
Run Code Online (Sandbox Code Playgroud)
然后我调用一个构造函数并将pre_results传递给函数调用:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
Run Code Online (Sandbox Code Playgroud)
我在这里做的是将文件传递给for循环,进行一些预处理,然后将文件传递给两个模型.
关于如何并行化这个的想法或提示?我开始得到奇怪的错误,我不知道如何修复代码.代码确实有效.我使用了一堆pandas数据帧,系列和numpy数组,我宁愿不回去更改所有内容以使用dask.dataframes等.
我的评论中的代码可能难以阅读.这是一种更加格式化的方式.
在下面的代码中,当我输入print(mean_squared_error)时,我得到:延迟('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, …Run Code Online (Sandbox Code Playgroud) parallel-processing multithreading python-3.x python-multiprocessing dask
我有以下问题
我有一个包含句子的数据框主文件,例如
master
Out[8]:
original
0 this is a nice sentence
1 this is another one
2 stackoverflow is nice
Run Code Online (Sandbox Code Playgroud)
对于Master中的每一行,我使用查找到另一个Dataframe 从站以获得最佳匹配fuzzywuzzy.我使用fuzzywuzzy,因为两个数据帧之间的匹配句子可能有点不同(额外的字符等).
例如,奴隶可能是
slave
Out[10]:
my_value name
0 2 hello world
1 1 congratulations
2 2 this is a nice sentence
3 3 this is another one
4 1 stackoverflow is nice
Run Code Online (Sandbox Code Playgroud)
这是一个功能齐全,精彩,紧凑的工作示例:)
from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib
master= pd.DataFrame({'original':['this is a nice sentence', …Run Code Online (Sandbox Code Playgroud) 新的dask,1GB当我在dask数据帧中读取它时,我有一个CSV文件,它在我写入文件的更改后创建了大约50个分区,它创建了与分区一样多的文件.
有没有办法将所有分区写入单个CSV文件,是否有办法访问分区?
谢谢.
假设我有pandas数据帧:
df=pd.DataFrame({'a':[1,2,3],'b':[4,5,6]})
Run Code Online (Sandbox Code Playgroud)
当我将它转换为dask数据帧时应该name和divisions参数包括:
from dask import dataframe as dd
sd=dd.DataFrame(df.to_dict(),divisions=1,meta=pd.DataFrame(columns=df.columns,index=df.index))
Run Code Online (Sandbox Code Playgroud)
TypeError:init()缺少1个必需的位置参数:'name'
编辑:假设我创建了一个pandas数据框,如:
pd.DataFrame({'a':[1,2,3],'b':[4,5,6]})
Run Code Online (Sandbox Code Playgroud)
同样如何创建DASK数据帧,因为它需要三个额外的参数作为name,divisions和meta.
sd=dd.Dataframe({'a':[1,2,3],'b':[4,5,6]},name=,meta=,divisions=)
Run Code Online (Sandbox Code Playgroud)
谢谢您的回复.
我有一个dask dataframe按索引(first_name)分组.
import pandas as pd
import numpy as np
from multiprocessing import cpu_count
from dask import dataframe as dd
from dask.multiprocessing import get
from dask.distributed import Client
NCORES = cpu_count()
client = Client()
entities = pd.DataFrame({'first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13']})
df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))
Run Code Online (Sandbox Code Playgroud)
(显然entities在现实生活中有几千行)
我想将用户定义的函数应用于每个分组的数据帧.我想将每一行与组中的所有其他行进行比较(类似于Pandas将每行与数据帧中的所有行进行比较,并将结果保存在每行的列表中).
以下是我尝试应用的功能:
def contraster(x, DF):
matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis …Run Code Online (Sandbox Code Playgroud) dask ×10
python ×8
pandas ×7
scipy ×2
apache-spark ×1
bigdata ×1
dataframe ×1
fuzzywuzzy ×1
joblib ×1
numpy ×1
python-3.x ×1