Zel*_*ny7 913 python hdf5 large-data mongodb pandas
在学习大熊猫的过程中,我试图解决这个问题的答案已有好几个月了.我使用SAS进行日常工作,这非常适合它的核心支持.然而,由于其他许多原因,SAS作为一款软件非常糟糕.
有一天,我希望用python和pandas替换我对SAS的使用,但我目前缺乏大型数据集的核心工作流程.我不是在谈论需要分布式网络的"大数据",而是说文件太大而无法容纳在内存中,但又足够小以适应硬盘驱动器.
我的第一个想法是用于HDFStore
在磁盘上保存大型数据集,并仅将我需要的部分拉入数据帧进行分析.其他人提到MongoDB是一种更容易使用的替代品.我的问题是:
有哪些最佳实践工作流程可用于完成以下任务:
真实世界的例子将非常受欢迎,尤其是那些在"大数据"上使用熊猫的人.
编辑 - 我希望如何工作的示例:
我正在尝试找到执行这些步骤的最佳实践方法.阅读关于pandas和pytables的链接似乎附加一个新列可能是个问题.
编辑 - 特别回应杰夫的问题:
if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
.这些操作的结果是我的数据集中每条记录的新列.我很少会在数据集中添加行.我几乎总是会创建新的列(统计/机器学习用语中的变量或特征).
Jef*_*eff 581
我通常以这种方式使用数十亿字节的数据,例如我在磁盘上有表格,我通过查询读取,创建数据并追加.
值得阅读文档,并在本主题的后期提供有关如何存储数据的若干建议.
详细信息将影响您存储数据的方式,例如:
尽可能多地提供详细信息; 我可以帮你建立一个结构.
由于pytables被优化为按行进行操作(这是您查询的内容),因此我们将为每组字段创建一个表.通过这种方式,可以轻松选择一小组字段(可以使用大表格,但这样做效率更高......我想我将来可以修复这个限制...这是无论如何更直观):(
以下是伪代码.)
import numpy as np
import pandas as pd
# create a store
store = pd.HDFStore('mystore.h5')
# this is the key to your storage:
# this maps your fields to a specific group, and defines
# what you want to have as data_columns.
# you might want to create a nice class wrapping this
# (as you will want to have this map and its inversion)
group_map = dict(
A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
B = dict(fields = ['field_10',...... ], dc = ['field_10']),
.....
REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
)
group_map_inverted = dict()
for g, v in group_map.items():
group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
Run Code Online (Sandbox Code Playgroud)
读取文件并创建存储(基本上做什么append_to_multiple
):
for f in files:
# read in the file, additional options hmay be necessary here
# the chunksize is not strictly necessary, you may be able to slurp each
# file into memory in which case just eliminate this part of the loop
# (you can also change chunksize if necessary)
for chunk in pd.read_table(f, chunksize=50000):
# we are going to append to each table by group
# we are not going to create indexes at this time
# but we *ARE* going to create (some) data_columns
# figure out the field groupings
for g, v in group_map.items():
# create the frame for this group
frame = chunk.reindex(columns = v['fields'], copy = False)
# append it
store.append(g, frame, index=False, data_columns = v['dc'])
Run Code Online (Sandbox Code Playgroud)
现在你已经拥有了文件中的所有表格(实际上你可以将它们存储在单独的文件中,如果你愿意,你可能需要将文件名添加到group_map,但可能这不是必需的).
这是您获取列并创建新列的方法:
frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
# select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows
# do calculations on this frame
new_frame = cool_function_on_frame(frame)
# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
Run Code Online (Sandbox Code Playgroud)
当您准备好进行post_processing时:
# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
Run Code Online (Sandbox Code Playgroud)
关于data_columns,您实际上不需要定义任何 data_columns; 它们允许您根据列子选择行.例如:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
Run Code Online (Sandbox Code Playgroud)
在最终报告生成阶段,它们可能对您最感兴趣(实质上,数据列与其他列隔离,如果您定义了很多,这可能会影响效率).
您可能还想:
如果您有疑问,请告诉我!
use*_*356 126
我认为上面的答案缺少一个我发现非常有用的简单方法.
当我的文件太大而无法加载到内存中时,我会将文件分解为多个较小的文件(按行或列)
示例:如果30天大小的交易数据为30天,我将其分成每天约1GB大小的文件.我随后分别处理每个文件并在最后汇总结果
其中一个最大的优点是它允许并行处理文件(多个线程或进程)
另一个优点是文件操作(如在示例中添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的
这种方法并不涵盖所有场景,但在很多场景中非常有用
Pri*_*ate 69
现在,在问题发生两年后,一个"核心外"的熊猫相当于:dask.太棒了!虽然它不支持所有的熊猫功能,但你可以使用它.
rju*_*ney 59
如果您的数据集在1到20GB之间,那么您应该得到一个具有48GB RAM的工作站.然后Pandas可以将整个数据集保存在RAM中.我知道这不是你在这里寻找的答案,但在4GB内存的笔记本电脑上进行科学计算是不合理的.
bri*_*ler 46
这是pymongo的情况.我还在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行原型设计.首先,pymongo是基于文档的DB,因此每个人都是(dict
属性的)文档.许多人组成一个集合,你可以有很多集合(人,股市,收入).
pd.dateframe - > pymongo注意:我使用chunksize
in read_csv
来保持5到10k记录(如果更大,pymongo会丢弃套接字)
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
Run Code Online (Sandbox Code Playgroud)
查询:gt =大于...
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
Run Code Online (Sandbox Code Playgroud)
.find()
返回一个迭代器,所以我通常ichunked
用来切入更小的迭代器.
加入怎么样,因为我通常会将10个数据源粘贴在一起:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
Run Code Online (Sandbox Code Playgroud)
然后(在我的情况下,有时我必须aJoinDF
首先在其"合并"之前进行聚合.)
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
Run Code Online (Sandbox Code Playgroud)
然后,您可以通过下面的更新方法将新信息写入主集合.(逻辑集合与物理数据源).
collection.update({primarykey:foo},{key:change})
Run Code Online (Sandbox Code Playgroud)
在较小的查找上,只是非规范化.例如,您在文档中有代码,只需添加字段代码文本并在dict
创建文档时进行查找.
现在你有一个基于一个人的好数据集,你可以在每个案例中释放你的逻辑并创建更多属性.最后,你可以阅读大熊猫你的3到内存最大关键指标,并进行枢轴/聚合/数据探索.这对我有300万条记录的数字/大文/类别/代码/花车/ ...
您还可以使用MongoDB中内置的两种方法(MapReduce和聚合框架).有关聚合框架的更多信息,请参见此处,因为它似乎比MapReduce更容易,并且看起来很方便快速聚合工作.注意我不需要定义我的字段或关系,我可以添加项目到文档.在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB帮助我开始工作:)
Joh*_*man 37
我发现这有点晚了,但我处理类似的问题(按揭预付款模式).我的解决方案是跳过pandas HDFStore层并使用直接pytables.我将每列保存为最终文件中的单个HDF5阵列.
我的基本工作流程是首先从数据库中获取CSV文件.我gzip它,所以它不是那么大.然后我将其转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为实际数据类型,并将其写入HDF5文件.这需要几十分钟,但它不使用任何内存,因为它只是逐行操作.然后我将面向行的HDF5文件"转置"为面向列的HDF5文件.
表转置如下:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
# Get a reference to the input data.
tb = h_in.getNode(table_path)
# Create the output group to hold the columns.
grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
for col_name in tb.colnames:
logger.debug("Processing %s", col_name)
# Get the data.
col_data = tb.col(col_name)
# Create the output array.
arr = h_out.createCArray(grp,
col_name,
tables.Atom.from_dtype(col_data.dtype),
col_data.shape)
# Store the data.
arr[:] = col_data
h_out.flush()
Run Code Online (Sandbox Code Playgroud)
读回来然后看起来像:
def read_hdf5(hdf5_path, group_path="/data", columns=None):
"""Read a transposed data set from a HDF5 file."""
if isinstance(hdf5_path, tables.file.File):
hf = hdf5_path
else:
hf = tables.openFile(hdf5_path)
grp = hf.getNode(group_path)
if columns is None:
data = [(child.name, child[:]) for child in grp]
else:
data = [(child.name, child[:]) for child in grp if child.name in columns]
# Convert any float32 columns to float64 for processing.
for i in range(len(data)):
name, vec = data[i]
if vec.dtype == np.float32:
data[i] = (name, vec.astype(np.float64))
if not isinstance(hdf5_path, tables.file.File):
hf.close()
return pd.DataFrame.from_items(data)
Run Code Online (Sandbox Code Playgroud)
现在,我通常在拥有大量内存的机器上运行它,所以我可能对内存使用情况不够谨慎.例如,默认情况下,加载操作会读取整个数据集.
这通常适合我,但它有点笨重,我不能使用花哨的pytables魔术.
编辑:这种方法的真正优势在于记录数组的pytables默认值,然后我可以使用h5r将数据加载到R中,而h5r无法处理表.或者,至少,我无法让它加载异构表.
yts*_*aig 29
我发现有助于大数据用例的一个技巧是通过将浮点精度降低到32位来减少数据量.它并不适用于所有情况,但在许多应用中,64位精度是过度的,节省2倍的内存是值得的.使一个明显的观点更加明显:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB
>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
Run Code Online (Sandbox Code Playgroud)
wp7*_*8de 18
正如其他人所指出的那样,经过几年的出现,出现了"核心外"的大熊猫等同物:dask.虽然dask不是大熊猫及其所有功能的直接替代品,但它有以下几个原因:
Dask是一个灵活的分析计算并行计算库,针对"大数据"集合的交互式计算工作负载的动态任务调度进行了优化,如并行数组,数据帧和列表,将NumPy,Pandas或Python迭代器等常用接口扩展为更大 - 超过内存或分布式环境,并从笔记本电脑扩展到集群.
Dask强调以下优点:
- 熟悉:提供并行化的NumPy数组和Pandas DataFrame对象
- 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面.
- Native:通过访问PyData堆栈,在Pure Python中启用分布式计算.
- 快速:以低开销,低延迟和快速数值算法所需的最小序列化运行
- 向上扩展:在具有1000个内核的集群上弹性运行向下扩展:在单个进程中设置并在笔记本电脑上运行的简单操作
- 响应:设计时考虑到交互式计算,它提供快速反馈和诊断以帮助人类
并添加一个简单的代码示例:
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
Run Code Online (Sandbox Code Playgroud)
替换一些像这样的pandas代码:
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
Run Code Online (Sandbox Code Playgroud)
并且,特别值得注意的是,通过concurrent.futures接口提供了一个用于提交自定义任务的通用:
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
Run Code Online (Sandbox Code Playgroud)
Oph*_*tan 13
还有一个变种
在pandas中完成的许多操作也可以作为db查询完成(sql,mongo)
使用RDBMS或mongodb可以在DB Query中执行一些聚合(针对大数据进行优化,并有效地使用缓存和索引)
之后,您可以使用pandas执行后期处理.
这种方法的优点是你可以获得数据库优化来处理大数据,同时仍然用高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节核心.
虽然查询语言和pandas不同,但将部分逻辑从一个转换为另一个通常并不复杂.
Rob*_*Rob 13
我想指出 Vaex 包。
Vaex 是一个用于惰性 Out-of-Core DataFrames(类似于 Pandas)的 Python 库,用于可视化和探索大型表格数据集。它可以在每秒高达十亿 (10 9 ) 个对象/行的 N 维网格上计算统计数据,例如平均值、总和、计数、标准偏差等。可视化是使用直方图、密度图和 3D 体积渲染完成的,允许对大数据进行交互式探索。Vaex 使用内存映射、零内存复制策略和延迟计算来获得最佳性能(不浪费内存)。
看一下文档:https : //vaex.readthedocs.io/en/latest/ API 非常接近 Pandas 的 API。
lev*_*lev 10
这里值得一提的是Ray,
它是一个分布式计算框架,它以分布式方式为pandas提供了自己的实现.
只需替换pandas导入,代码应该按原样运行:
# import pandas as pd
import ray.dataframe as pd
#use pd as usual
Run Code Online (Sandbox Code Playgroud)
可以在这里阅读更多细节:
https://rise.cs.berkeley.edu/blog/pandas-on-ray/
小智 6
我最近遇到了类似的问题.我发现只是以块的形式读取数据并附加它,因为我以块的形式将它写入相同的csv.我的问题是根据另一个表中的信息添加日期列,使用某些列的值,如下所示.这可能会帮助那些被dask和hdf5困惑的人,但更熟悉像我这样的熊猫.
def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k
rows at a time and outputs them, appending as needed, to a single csv.
Uses the column of the raster names to get the date.
"""
df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True,
chunksize=100000) #read csv file as 100k chunks
'''Do some stuff'''
count = 1 #for indexing item in time list
for chunk in df: #for each 100k rows
newtime = [] #empty list to append repeating times for different rows
toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
while count <= toiterate.max():
for i in toiterate:
if i ==count:
newtime.append(newyears[count])
count+=1
print "Finished", str(chunknum), "chunks"
chunk["time"] = newtime #create new column in dataframe based on time
outname = "CHIRPS_tanz_time2.csv"
#append each output to same csv, using no header
chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
272326 次 |
最近记录: |