标签: castra

dask计算不是并行执行的

我有一个json文件目录,我试图转换为dask DataFrame并将其保存到castra.有200个文件,它们之间包含O(10**7)个json记录.代码非常简单,主要遵循教程示例.

import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")
Run Code Online (Sandbox Code Playgroud)

我在32核机器上运行它,但代码只使用100%的一个核心.我对文档的理解是这段代码并行执行.为什么不呢?我误解了什么吗?

python concurrency python-multiprocessing dask castra

8
推荐指数
1
解决办法
1101
查看次数

Dask DataFrame:对多行的 groupby 对象重新采样

我有以下从 Castra 创建的 dask 数据框:

import dask.dataframe as dd

df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
Run Code Online (Sandbox Code Playgroud)

产量:

                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e
Run Code Online (Sandbox Code Playgroud)

我想要做的是:

  1. user_id和分组ts
  2. 在 3 小时内重新采样
  3. 在重采样步骤中,任何合并的行都应该连接文本

示例输出:

                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e
Run Code Online (Sandbox Code Playgroud)

我尝试了以下方法:

df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute() …
Run Code Online (Sandbox Code Playgroud)

python dataframe pandas dask castra

5
推荐指数
1
解决办法
3070
查看次数