Jas*_*918 6 python pandas dask
在pandas中,我使用下面的典型模式将矢量化函数应用于df并返回多个值.当所述函数从单个任务产生多个独立输出时,这实际上是必需的.看看我过于琐碎的例子:
import pandas as pd
df = pd.DataFrame({'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]})
def myfunc(in1, in2):
out1 = in1 + in2
out2 = in1 * in2
return (out1, out2)
df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))
Run Code Online (Sandbox Code Playgroud)
目前我编写了一个单独的函数来对pandas df进行分块并使用多处理来提高效率,但我想使用dask来完成此任务.继续这个例子,下面是我如何在使用dask时运行向量化函数来返回单个值:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=2)
def simple_func(in1, in2):
out1 = in1 + in2
return out1
df['out3'] = ddf.map_partitions(lambda x: simple_func(x['val1'], x['val2']), meta=(None, 'i8')).compute()
Run Code Online (Sandbox Code Playgroud)
现在我想使用dask并返回两个值,如pandas示例中所示.我试图向meta添加一个列表并返回一个元组,但只是得到错误.在dask中这是可能的吗?
我认为这里的问题源于您组合结果的方式不是很好。理想情况下,您将df.apply与result_expand参数一起使用,然后使用df.merge. 将此代码从 Pandas 移植到 Dask 是微不足道的。对于熊猫,这将是:
import pandas as pd
def return_two_things(x, y):
return (
x + y,
x * y,
)
def pandas_wrapper(row):
return return_two_things(row['val1'], row['val2'])
df = pd.DataFrame({
'val1': range(1, 6),
'val2': range(1, 6),
})
res = df.apply(pandas_wrapper, axis=1, result_type='expand')
res.columns = ['out1', 'out2']
full = df.merge(res, left_index=True, right_index=True)
print(full)
Run Code Online (Sandbox Code Playgroud)
哪些输出:
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Run Code Online (Sandbox Code Playgroud)
对于 Dask,将函数应用于数据并整理结果实际上是相同的:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=2)
# here 0 and 1 refer to the default column names of the resulting dataframe
res = ddf.apply(pandas_wrapper, axis=1, result_type='expand', meta={0: int, 1: int})
# which are renamed out1, and out2 here
res.columns = ['out1', 'out2']
# this merge is considered "embarrassingly parallel", as a worker does not need to contact
# any other workers when it is merging the results (that it created) with the input data it used.
full = ddf.merge(res, left_index=True, right_index=True)
print(full.compute())
Run Code Online (Sandbox Code Playgroud)
输出:
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Run Code Online (Sandbox Code Playgroud)
聚会迟到了。也许在提出问题时这是不可能的。
我不喜欢结尾的分配模式。据我所知,dask 不允许像 pandas 那样进行新的列分配。
您需要将元值设置为您要返回的基本类型。根据我的测试,您可以非常简单地返回字典、元组、集合或列表。元实际上似乎并不关心类型是否与返回对象的类型匹配。
import pandas
import dask.dataframe
def myfunc(in1, in2):
out1 = in1 + in2
out2 = in1 * in2
return (out1, out2)
df = pandas.DataFrame({'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]})
ddf = dask.dataframe.from_pandas(df, npartitions=2)
df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))
output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1), meta=tuple).compute()
out1, out2 = zip(*output)
ddf = ddf.assign(out1 = pandas.Series(out1))
ddf = ddf.assign(out2 = pandas.Series(out2))
print('\nPandas\n',df)
print('\nDask\n',ddf.compute())
print('\nEqual\n',ddf.eq(df).compute().all())
Run Code Online (Sandbox Code Playgroud)
输出:
Pandas
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Dask
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Equal
val1 True
val2 True
out1 True
out2 True
dtype: bool
Run Code Online (Sandbox Code Playgroud)
值得注意的是,map_partition 的 lambda 返回是较大数据帧的分区(在本例中,基于您的 npartitions 值)。然后,您可以像使用 .apply() 处理任何其他数据帧一样对待它。