ana*_*ine 5 python numpy pandas dask
我正在使用 Dask 读取 10m 行 csv+ 并执行一些计算。到目前为止,它被证明比 Pandas 快 10 倍。
我在下面有一段代码,当与 Pandas 一起使用时可以正常工作,但与 dask 一起使用时会引发类型错误。我不确定如何克服 typerror。似乎在使用 dask 时,select 函数将一个数组传递回数据框/列,但在使用 Pandas 时却没有?但我不想将整个事情切换回 Pandas 并失去 10 倍的性能优势。
这个答案是 Stack Overflow 上其他一些人的一些帮助的结果,但是我认为这个问题与最初的问题相差甚远,以至于完全不同。代码如下。
PANDAS: 不包括 AndHeathSolRadFact 的工作时间:40 秒
import pandas as pd
import numpy as np
from timeit import default_timer as timer
start = timer()
df = pd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
df['DateTime'] = pd.to_datetime(df['Date'], format='%Y-%d-%m %H:%M')
df['Month'] = df['DateTime'].dt.month
df['Grass_FMC'] = (97.7+4.06*df['RH'])/(df['Temperature']+6)-0.00854*df['RH']+3000/df['Curing']-30
df["AndHeathSolRadFact"] = np.select(
[
(df['Month'].between(8,12)),
(df['Month'].between(1,2) & df['CloudCover']>30)
], #list of conditions
[1, 1], #list of results
default=0) #default if no match
print(df.head())
#print(ddf.tail())
end = timer()
print(end - start)
Run Code Online (Sandbox Code Playgroud)
DASK: 不包括 AndHeathSolRadFact 所花费的时间:4 秒
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
import pandas as pd
import numpy as np
# Dataframes implement the Pandas API
import dask.dataframe as dd
from timeit import default_timer as timer
start = timer()
ddf = dd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
ddf['DateTime'] = dd.to_datetime(ddf['Date'], format='%Y-%d-%m %H:%M')
ddf['Month'] = ddf['DateTime'].dt.month
ddf['Grass_FMC'] = (97.7+4.06*ddf['RH'])/(ddf['Temperature']+6)-0.00854*ddf['RH']+3000/ddf['Curing']-30
ddf["AndHeathSolRadFact"] = np.select(
[
(ddf['Month'].between(8,12)),
(ddf['Month'].between(1,2) & ddf['CloudCover']>30)
], #list of conditions
[1, 1], #list of results
default=0) #default if no match
print(ddf.head())
#print(ddf.tail())
end = timer()
print(end - start)
Run Code Online (Sandbox Code Playgroud)
错误
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-50-86c08f38bce6> in <module>
29 ], #list of conditions
30 [1, 1], #list of results
---> 31 default=0) #default if no match
32
33
~\Anaconda3\lib\site-packages\dask\dataframe\core.py in __setitem__(self, key, value)
3276 df = self.assign(**{k: value for k in key})
3277 else:
-> 3278 df = self.assign(**{key: value})
3279
3280 self.dask = df.dask
~\Anaconda3\lib\site-packages\dask\dataframe\core.py in assign(self, **kwargs)
3510 raise TypeError(
3511 "Column assignment doesn't support type "
-> 3512 "{0}".format(typename(type(v)))
3513 )
3514 if callable(v):
TypeError: Column assignment doesn't support type numpy.ndarray
Run Code Online (Sandbox Code Playgroud)
示例 Weathegrids CSV
Location,Date,Temperature,RH,WindDir,WindSpeed,DroughtFactor,Curing,CloudCover
1075,2019-20-09 04:00,6.8,99.3,143.9,5.6,10.0,93.0,1.0
1075,2019-20-09 05:00,6.4,100.0,93.6,7.2,10.0,93.0,1.0
1075,2019-20-09 06:00,6.7,99.3,130.3,6.9,10.0,93.0,1.0
1075,2019-20-09 07:00,8.6,95.4,68.5,6.3,10.0,93.0,1.0
1075,2019-20-09 08:00,12.2,76.0,86.4,6.1,10.0,93.0,1.0
Run Code Online (Sandbox Code Playgroud)
我刚刚遇到了类似的问题,我可以通过将其转换ndarray为 Dask 数组来使其工作。我还必须确保 Dask DataFrame 和 Dask DataFrame 之间的分区数量匹配ndarray。
ana*_*ine -1
这个答案并不优雅,但很实用。
我发现 select 函数在 pandas 的 11m 行数据集上快了大约 20 秒。我还发现,即使我在 dask 中执行相同的函数,结果也会返回一个 numpy(pandas)数组。Dask 本质上不能接受这一点,但可以在 dask 和 pandas 之间传输数据帧。
因此,我得到了 dask 中加载和日期转换的好处(4 秒,而 pandas 中为 40 秒),使用 pandas 选择的好处(40 秒,而 dask 中为 60 秒),并且只需要接受我会使用更多内存。
数据帧之间的转换几乎没有时间损失。
最后,我必须确保清理了数据帧,因为 python 没有清理测试运行之间的内存,而是不断累积。
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
import pandas as pd
import numpy as np
# Dataframes implement the Pandas API
import dask.dataframe as dd
from timeit import default_timer as timer
start = timer()
ddf = dd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
#print(ddf.describe(include='all'))
#Wrangle the dates so we can interrogate them
ddf['DateTime'] = dd.to_datetime(ddf['Date'], format='%Y-%d-%m %H:%M')
ddf['Month'] = ddf['DateTime'].dt.month
#Grass Fuel Moisture Content
ddf['Grass_FMC'] = (97.7+4.06*ddf['RH'])/(ddf['Temperature']+6)-0.00854*ddf['RH']+3000/ddf['Curing']-30
#Convert to a Pandas DataFrame because dask was being slow with the select logic below
df = ddf.compute()
del [ddf]
#ddf["AndHeathSolRadFact"] = np.select(
#Solar Radiation Factor - this seems to take 32 seconds. Why?
df["AndHeathSolRadFact"] = np.select(
[
(df['Month'].between(8,12)),
(df['Month'].between(1,2) & df['CloudCover']>30)
], #list of conditions
[1, 1], #list of results
default=0) #default if no match
#Convert back to a Dask dataframe because we want that juicy parallelism
ddf2 = dd.from_pandas(df,npartitions=4)
del [df]
print(ddf2.head())
#print(ddf.tail())
end = timer()
print(end - start)
#Clean up remaining dataframes
del [[ddf2]]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4544 次 |
| 最近记录: |