kua*_*anb 5 python parallel-processing dask
在下面的代码片段中,我希望日志打印数字0 - 4.我知道数字可能不是那个顺序,因为任务将被分解为许多并行操作.
代码段:
from dask import dataframe as dd
import numpy as np
import pandas as pd
df = pd.DataFrame({'A': np.arange(5),
'B': np.arange(5),
'C': np.arange(5)})
ddf = dd.from_pandas(df, npartitions=1)
def aggregate(x):
print('B val received: ' + str(x.B))
return x
ddf.apply(aggregate, axis=1).compute()
Run Code Online (Sandbox Code Playgroud)
但是当运行上面的代码时,我会看到:
B val received: 1
B val received: 1
B val received: 1
B val received: 0
B val received: 0
B val received: 1
B val received: 2
B val received: 3
B val received: 4
Run Code Online (Sandbox Code Playgroud)
而不是0 - 4,我看到一系列1首先打印,并且额外的0.我注意到每次设置Dask DataFrame并对其运行apply操作时都会出现值1的"额外"行.
打印数据框在整个过程中不会显示值为1的其他行:
A B C
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
Run Code Online (Sandbox Code Playgroud)
我的问题是:这些值为1的行来自哪里?为什么它们似乎始终出现在数据框中的"实际"行之前?1值似乎与实际行中的值无关(也就是说,它不是因为某种原因再次抓住第二行).
@Grr的回答是正确的.Dask.dataframe不知道你的函数会产生什么,但仍然必须为你提供一个懒惰的dask.dataframe,它具有正确的类型,dtypes等,所以它会尝试你的函数一些数据.
您可以通过使用meta=关键字提供有关预期输出的元数据来避免这些检查(DataFrame.apply文档字符串中的更多详细信息).如果您提供此信息,那么Dask.dataframe将不需要尝试您的函数来确定类型.
在此处复制此部分:
meta:pd.DataFrame,pd.Series,dict,iterable,tuple,optional
与输出的dtypes和列名匹配的空pd.DataFrame或pd.Series.这个元数据对于dask数据帧中的许多算法都是必需的.为了便于使用,还可以使用一些替代输入.可以提供{name:dtype}或(name,dtype)的可迭代的dict而不是DataFrame.可以使用(name,dtype)的元组而不是系列.如果未提供,dask将尝试推断元数据.这可能会导致意外结果,因此建议提供meta.有关更多信息,请参阅dask.dataframe.utils.make_meta.
因此,如果您创建一个示例输出作为空数据帧,那么你会没事的:
meta = pd.DataFrame({'A': [1], 'B': [2], 'C': [3]},
columns=['A', 'B', 'C'])
ddf.apply(aggregate, axis=1, meta=meta)
Run Code Online (Sandbox Code Playgroud)
或者,在这种情况下,因为您的函数不会更改输入的列或dtype,您可以只使用输入的元
ddf.apply(aggregate, axis=1, meta=ddf.meta)
Run Code Online (Sandbox Code Playgroud)
在尝试在整个分区集合上执行此操作之前,Dask会对您告诉它要执行的操作进行一些检查.这是前几张印刷语句的来源.它是内置错误检查的一部分,它可以阻止Dask进行一些冗长的操作系列操作并最终失败.