我有一个json文件目录,我试图转换为dask DataFrame并将其保存到castra.有200个文件,它们之间包含O(10**7)个json记录.代码非常简单,主要遵循教程示例.
import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")
Run Code Online (Sandbox Code Playgroud)
我在32核机器上运行它,但代码只使用100%的一个核心.我对文档的理解是这段代码并行执行.为什么不呢?我误解了什么吗?
我有以下从 Castra 创建的 dask 数据框:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
Run Code Online (Sandbox Code Playgroud)
产量:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
Run Code Online (Sandbox Code Playgroud)
我想要做的是:
user_id和分组ts示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
Run Code Online (Sandbox Code Playgroud)
我尝试了以下方法:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute() …Run Code Online (Sandbox Code Playgroud)