如何转置 dask 数据框(将列转换为行)以实现整洁的数据原则

Lin*_*dc3 5 python twitter transpose dataframe dask

TLDR:我从 dask bag 创建了一个 dask 数据框。dask 数据框将每个观察(事件)视为一列。因此,我没有为每个事件提供几行数据,而是为每个事件提供一列。目标是将列转置为行,就像 pandas 使用 df.T 转置数据帧一样。

详细信息:我的时间线中有示例 Twitter 数据。为了达到我的起点,这里是从磁盘读取 json 到 adask.bag然后将其转换为 a 的代码dask.dataframe

import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()
Run Code Online (Sandbox Code Playgroud)

问题我所有的个人事件(即推文)都记录为列副行。根据tidy原则,我希望每个事件都有行。 pandas有一个用于数据帧的转置方法,而 dask.array 有一个用于数组的转置方法。我的目标是在 dask 数据帧上执行相同的转置操作。我该怎么做呢?

  1. 将行转换为列

编辑解决方案

此代码解决了原始转置问题,通过定义要保留的列并删除其余列来清理 Twitter json 文件,并通过将函数应用于 Series 来创建新列。然后,我们将一个更小的、干净的文件写入磁盘。

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)
Run Code Online (Sandbox Code Playgroud)

Mik*_*ham 2

我认为你可以通过完全绕过 bag 来获得你想要的结果,代码如下

import glob

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

filenames = glob.glob('sampleTwitter*.json')
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
ddf = dd.from_delayed(dfs)
Run Code Online (Sandbox Code Playgroud)