Dask 按索引分配给数据帧列会引发 ValueError

Apo*_*los 5 python dataframe pandas dask dask-distributed

我有一个按数据框分组的转换管道。所有函数都获取 aDataframeGroupBy并计算一些特征。然后将这些特征存储在数据框中。数据帧的索引是相同的,因为所有特征都是由同一DataFrameGroupBy对象派生的。函数如下所示:

def function(group_by_df, features_df=None):
    # actions to perform to group_by_df e.g
    feature_max = group_by_df.column.max() # This is a series object with index the same as group_by_df
    if features_df is not None:
        features_df['feature_name'] = feature_max
    else:
        features_df = feature_max.to_frame(name='feature_name')
    return features_df
Run Code Online (Sandbox Code Playgroud)

因此,由于这是迭代的,因此第一次 features_df 为 none,因此创建了数据帧。然后,当执行所有其他迭代时,feature_df 具有包含所有先前特征的列。在尝试将由生成的一系列分配给的一个步骤中,group_by_dffeature_df收到以下错误:

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
Run Code Online (Sandbox Code Playgroud)

奇怪的部分是运行以下代码:

featues_pandas = features_df.compute()
feature_series_with_issue_pandas = feature_series_with_issue_pandas.compute()
features_pandas['feature_name'] = feature_series_with_issue_pandas
Run Code Online (Sandbox Code Playgroud)

作品。这是隔离失败的功能并尝试将其分配给迄今为止创建的数据框,但在 pandas 方面,它是有效的。我可能做错了什么吗?

添加 MCVE:

raw_data = pd.DataFrame({'username':list('ab')*10, 'user_agent': list('cdef')*5, 'method':['POST'] * 20, 'dst_port':[80]*20, 'dst':['1.1.1.1']*20})
past = pd.DataFrame({'user_agent':list('cde'), 'percent':[0,3, 0.3, 0.4]})
dask_raw = dd.from_pandas(raw_data, npartitions=4)
dask_past = dd.from_pandas(past, npartitions=4)
dask_past = dask_past.set_index('user_agent')
merged_raw = dask_raw.merge(dask_past, how='left', left_on='user_agent', right_index=True)
grouped_by_df = merged_raw.groupby(['username', 'dst', 'dst_port'])
feature_one = grouped_by_df.apply(lambda x: 'POST' in x.values, meta=('feature_one', '?'))
features = feature_one.to_frame(name='feature_one')
feature_two = grouped_by_df.percent.min()
feature_two = feature_two.fillna(0)
features['feature_two'] = feature_two 

Traceback (most recent call last):
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2882, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-33-92f58e5ed5a0>", line 1, in <module>
   features['feature_two'] = feature_two
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/dataframe/core.py", line 2319, in __setitem__
    df = self.assign(**{key: value})
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/dataframe/core.py", line 2498, in assign
    return elemwise(methods.assign, self, *pairs, meta=df2)
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/dataframe/core.py", line 3028, in elemwise
    args = _maybe_align_partitions(args)
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/dataframe/multi.py", line 147, in _maybe_align_partitions
    dfs2 = iter(align_partitions(*dfs)[0])
  File "/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/dataframe/multi.py", line 103, in align_partitions
    raise ValueError("Not all divisions are known, can't align "
ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
Run Code Online (Sandbox Code Playgroud)