Pyl*_*der 6 python numpy pandas dask
我正在努力尝试将程序转换为可并行化/多线程与优秀的dask库.这是我正在进行转换的程序:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)
预期成绩:
id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8
Run Code Online (Sandbox Code Playgroud)
在我看来,这可能是尝试并行化的一个很好的候选者,因为单独的dask分区不需要知道彼此之间的任何事情来完成所需的操作.这是我认为它可能如何工作的天真表示:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)
到目前为止,我一直在尝试解决以下错误或惯用的差异:
2. dask不支持变异运算符:"+ ="
3.没有dask .to_timedelta()参数
4.没有dask .cumcount()(但我认为.cumsum()可以互换?!)
如果有任何dask专家可以让我知道是否有根本障碍阻止我尝试这个或任何实施技巧,那将是一个很大的帮助!
编辑:
自从发布问题以来,我认为我在这方面取得了一些进展:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)
但是,我仍然坚持dask数组重复错误.任何提示仍然欢迎.
小智 2
不确定这是否正是您正在寻找的,但我用 np.repeat 替换了 da.repeat,以及显式转换dd_test.index
和dd_test['units']
numpy 数组,最后添加dd_test['transaction_dt'].astype('M8[us]')
到您的 timedelta 计算中。
df_test = pd.read_csv(StringIO(test_data), sep=',')
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1
dd_test = dd_test.loc[np.repeat(np.array(dd_test.index),
np.array(dd_test['units']))]
dd_test['transaction_dt'] = dd_test['transaction_dt'].astype('M8[us]') + (dd_test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)
df_expected = dd_test.compute()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1686 次 |
最近记录: |