eve*_*007 19 python memory multiprocessing pandas python-multiprocessing
我已将程序(如下)写入:
pandas dataframegroupby使用特定列值拆分数据并存储为数据帧列表.multiprocess Pool.map()并行处理每个数据帧.一切都很好,该程序在我的小测试数据集上运行良好.但是,当我输入大数据(大约14 GB)时,内存消耗呈指数级增长,然后冻结计算机或被杀死(在HPC群集中).
一旦数据/变量无效,我就添加了代码来清除内存.一旦完成,我也正在关闭游泳池.仍然有14 GB的输入我只期望2*14 GB的内存负担,但似乎很多正在进行.我也尝试使用调整,chunkSize and maxTaskPerChild, etc但我没有看到测试与大文件的优化有任何区别.
我认为,当我开始时,在此代码位置需要对此代码进行改进multiprocessing.
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
但是,我发布了整个代码.
测试示例:我创建了一个高达250 mb的测试文件("genome_matrix_final-chr1234-1mb.txt")并运行该程序.当我检查系统监视器时,我可以看到内存消耗增加了大约6 GB.我不太清楚为什么250 mb文件加上一些输出需要这么大的内存空间.如果它有助于查看真正的问题,我通过下拉框共享该文件.https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
有人可以建议,我怎么能摆脱这个问题?
我的python脚本:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02
#genome_matrix_file = "genome_matrix_final.txt" # large file
def main():
with open("genome_matrix_header.txt") as header:
header = header.read().rstrip('\n').split('\t')
print()
time01 = time.time()
print('starting time: ', time01)
'''load the genome matrix file onto pandas as dataframe.
This makes is more easy for multiprocessing'''
gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)
# now, group the dataframe by chromosome/contig - so it can be multiprocessed
gen_matrix_df = gen_matrix_df.groupby('CHROM')
# store the splitted dataframes as list of key, values(pandas dataframe) pairs
# this list of dataframe will be used while multiprocessing
gen_matrix_df_list = collections.OrderedDict()
for chr_, data in gen_matrix_df:
gen_matrix_df_list[chr_] = data
# clear memory
del gen_matrix_df
'''Now, pipe each dataframe from the list using map.Pool() '''
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
del gen_matrix_df_list # clear memory
p.close()
p.join()
# concat the results from pool.map() and write it to a file
result_merged = pd.concat(result)
del result # clear memory
pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)
print()
print('completed all process in "%s" sec. ' % (time.time() - time01))
print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
print()
'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):
print()
time02 = time.time()
# index position of the samples in genome matrix file
sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
{'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
{'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
{'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
{'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
{'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
{'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
{'8a': 32, '8b': 17}]
# sample index stored as ordered dictionary
sample_idx_ord_list = []
for ids in sample_idx:
ids = collections.OrderedDict(sorted(ids.items()))
sample_idx_ord_list.append(ids)
# for haplotype file
header = ['contig', 'pos', 'ref', 'alt']
# adding some suffixes "PI" to available sample names
for item in sample_idx_ord_list:
ks_update = ''
for ks in item.keys():
ks_update += ks
header.append(ks_update+'_PI')
header.append(ks_update+'_PG_al')
#final variable store the haplotype data
# write the header lines first
haplotype_output = '\t'.join(header) + '\n'
# to store the value of parsed the line and update the "PI", "PG" value for each sample
updated_line = ''
# read the piped in data back to text like file
matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)
matrix_df = matrix_df.rstrip('\n').split('\n')
for line in matrix_df:
if line.startswith('CHROM'):
continue
line_split = line.split('\t')
chr_ = line_split[0]
ref = line_split[2]
alt = list(set(line_split[3:]))
# remove the alleles "N" missing and "ref" from the alt-alleles
alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))
# if no alt alleles are found, just continue
# - i.e : don't write that line in output file
if len(alt_up) == 0:
continue
#print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
#so, we have data for CHR, POS, REF, ALT so far
# now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
sample_data_for_vcf = []
for ids in sample_idx_ord_list:
sample_data = []
for key, val in ids.items():
sample_value = line_split[val]
sample_data.append(sample_value)
# now, update the phased state for each sample
# also replacing the missing allele i.e "N" and "-" with ref-allele
sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
sample_data_for_vcf.append(str(chr_))
sample_data_for_vcf.append(sample_data)
# add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
# and .. write it to final haplotype file
sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
'\t' + sample_data_for_vcf + '\n'
haplotype_output += updated_line
del matrix_df # clear memory
print('completed haplotype preparation for chromosome/contig "%s" '
'in "%s" sec. ' %(chr_, time.time()-time02))
print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))
# return the data back to the pool
return pd.read_csv(io.StringIO(haplotype_output), sep='\t')
''' to monitor memory '''
def current_mem_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
赏金猎人的更新:
我已经实现了多处理,Pool.map()但是代码导致了很大的内存负担(输入测试文件大约300 MB,但内存负担大约为6 GB).我只期望最大3*300 mb的内存负担.
saa*_*aaj 20
在Python中(下面我使用64位构建的Python 3.6.5),一切都是对象.这有其开销,getsizeof我们可以确切地看到对象的大小(以字节为单位):
>>> import sys
>>> sys.getsizeof(42)
28
>>> sys.getsizeof('T')
50
Run Code Online (Sandbox Code Playgroud)multiprocessing.get_start_method())创建子进程时,不会复制父进程的物理内存,并使用写时复制技术.
- 进程A具有50 KiB的非共享内存
- 进程B具有300 KiB的非共享内存
- 进程A和进程B都具有相同共享内存区域的100 KiB
由于PSS被定义为进程的非共享内存和与其他进程共享的内存比例的总和,因此这两个进程的PSS如下:
- 过程A的PSS = 50 KiB +(100 KiB/2)= 100 KiB
- 过程B的PSS = 300 KiB +(100 KiB/2)= 350 KiB
不要让你DataFrame独自一人.memory_profiler会帮助我们.
justpd.py
#!/usr/bin/env python3
import pandas as pd
from memory_profiler import profile
@profile
def main():
with open('genome_matrix_header.txt') as header:
header = header.read().rstrip('\n').split('\t')
gen_matrix_df = pd.read_csv(
'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
gen_matrix_df.info()
gen_matrix_df.info(memory_usage='deep')
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
现在让我们使用探查器:
mprof run justpd.py
mprof plot
Run Code Online (Sandbox Code Playgroud)
我们可以看到情节:
和逐行跟踪:
Line # Mem usage Increment Line Contents
================================================
6 54.3 MiB 54.3 MiB @profile
7 def main():
8 54.3 MiB 0.0 MiB with open('genome_matrix_header.txt') as header:
9 54.3 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
10
11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
12
13 2072.0 MiB 0.0 MiB gen_matrix_df.info()
14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage='deep')
Run Code Online (Sandbox Code Playgroud)
我们可以看到数据帧在构建时需要~2 GiB,峰值在~3 GiB.更有趣的是输出info.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB
Run Code Online (Sandbox Code Playgroud)
但是info(memory_usage='deep')("深层"意味着通过询问object dtypes来深入反省数据,见下文)给出:
memory usage: 7.9 GB
Run Code Online (Sandbox Code Playgroud)
咦?在这个过程之外,我们可以确保这些 memory_profiler数据是正确的.sys.getsizeof也显示了(因为自定义的最有可能的帧相同的值__sizeof__)等将要估算分配使用它的其他工具gc.get_objects(),例如pympler.
# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()
Run Code Online (Sandbox Code Playgroud)
得到:
types | # objects | total size
================================================== | =========== | ============
<class 'pandas.core.series.Series | 34 | 7.93 GB
<class 'list | 7839 | 732.38 KB
<class 'str | 7741 | 550.10 KB
<class 'int | 1810 | 49.66 KB
<class 'dict | 38 | 7.43 KB
<class 'pandas.core.internals.SingleBlockManager | 34 | 3.98 KB
<class 'numpy.ndarray | 34 | 3.19 KB
Run Code Online (Sandbox Code Playgroud)
那么这7.93 GiB来自哪里?我们试着解释一下.我们有4M行和34列,它们给出了134M的值.它们是int64或者object(它是64位指针; 有关详细说明,请参阅使用大数据的pandas).因此,我们134 * 10 ** 6 * 8 / 2 ** 20仅对数据帧中的值有~1022 MiB.剩下的~6.93 GiB怎么样?
要了解行为,必须知道Python会进行字符串实习.在Python 2中有两篇关于字符串实习的好文章(一,二).除了Python 3中的Unicode更改和Python 3.3中的PEP 393之外,C结构已经改变,但想法是相同的.基本上,每个看起来像标识符的短字符串都将由Python在内部字典中缓存,引用将指向相同的Python对象.换句话说,我们可以说它表现得像一个单身人士.我上面提到的文章解释了它给出的重要内存配置文件和性能改进.我们可以使用以下interned字段检查字符串是否被实习PyASCIIObject:
import ctypes
class PyASCIIObject(ctypes.Structure):
_fields_ = [
('ob_refcnt', ctypes.c_size_t),
('ob_type', ctypes.py_object),
('length', ctypes.c_ssize_t),
('hash', ctypes.c_int64),
('state', ctypes.c_int32),
('wstr', ctypes.c_wchar_p)
]
Run Code Online (Sandbox Code Playgroud)
然后:
>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0
Run Code Online (Sandbox Code Playgroud)
使用两个字符串,我们也可以进行身份比较(在CPython的情况下在内存比较中解决).
>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True
Run Code Online (Sandbox Code Playgroud)
因此object dtype,就数据框而言,数据框最多分配20个字符串(每个氨基酸一个).但是,值得注意的是,Pandas建议使用枚举的分类类型.
因此,我们可以解释7.93 GiB的幼稚估计:
>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953
Run Code Online (Sandbox Code Playgroud)
请注意,这str_size是58字节,而不是50,正如我们在上面看到的1字符文字.这是因为PEP 393定义了紧凑和非紧凑的字符串.你可以检查一下sys.getsizeof(gen_matrix_df.REF[0]).
实际内存消耗应该是〜1 GiB,因为它的报告是gen_matrix_df.info(),它是两倍.我们可以假设它与Pandas或NumPy完成的内存(预)分配有关.以下实验表明它并非没有理由(多次运行显示保存图片):
Line # Mem usage Increment Line Contents
================================================
8 53.1 MiB 53.1 MiB @profile
9 def main():
10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header:
11 53.1 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
12
13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
Run Code Online (Sandbox Code Playgroud)
我想通过Pandas原作者关于设计问题和未来Pandas2的新文章引用本节.
pandas经验法则:RAM的数量是数据集大小的5到10倍
最后,让我们来到游泳池,看看是否可以使用写时复制.我们将使用smemstat(可从Ubuntu存储库中获得)来估计进程组内存共享并glances写下系统范围的可用内存.两者都可以编写JSON.
我们将运行原始脚本Pool(2).我们需要3个终端窗口.
smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1 glances -t 1 --export-json glances.jsonmprof run -M script.py然后mprof plot产生:
总和图(mprof run --nopython --include-children ./script.py)看起来像:
请注意,上面的两个图表显示了RSS.假设是因为写时复制它并不反映实际的内存使用情况.现在我们有来自smemstat和的两个JSON文件glances.我将使用以下脚本将JSON文件转换为CSV.
#!/usr/bin/env python3
import csv
import sys
import json
def smemstat():
with open('smemstat.json') as f:
smem = json.load(f)
rows = []
fieldnames = set()
for s in smem['smemstat']['periodic-samples']:
row = {}
for ps in s['smem-per-process']:
if 'script.py' in ps['command']:
for k in ('uss', 'pss', 'rss'):
row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20
# smemstat produces empty samples, backfill from previous
if rows:
for k, v in rows[-1].items():
row.setdefault(k, v)
rows.append(row)
fieldnames.update(row.keys())
with open('smemstat.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
dw.writeheader()
list(map(dw.writerow, rows))
def glances():
rows = []
fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
'free', 'mem_critical', 'inactive', 'shared', 'history_size',
'mem_warning', 'total', 'active', 'buffers']
with open('glances.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=fieldnames)
dw.writeheader()
with open('glances.json') as f:
for l in f:
d = json.loads(l)
dw.writerow(d['mem'])
if __name__ == '__main__':
globals()[sys.argv[1]]()
Run Code Online (Sandbox Code Playgroud)
首先让我们来看看free记忆.
第一和最小之间的差异是~4.15 GiB.这是PSS数字的样子:
总和:
因此我们可以看到,由于写时复制实际内存消耗约为4.15 GiB.但我们仍然将数据序列化以通过它将其发送给工作进程Pool.map.我们也可以在这里利用写时复制吗?
要使用copy-on-write,我们需要list(gen_matrix_df_list.values())全局访问,以便fork之后的worker仍然可以读取它.
让我们修改后的代码del gen_matrix_df在main类似如下:
...
global global_gen_matrix_df_values
global_gen_matrix_df_values = list(gen_matrix_df_list.values())
del gen_matrix_df_list
p = Pool(2)
result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
...
Run Code Online (Sandbox Code Playgroud)del gen_matrix_df_list后来.并修改第一行matrix_to_vcf:
def matrix_to_vcf(i):
matrix_df = global_gen_matrix_df_values[i]
Run Code Online (Sandbox Code Playgroud)现在让我们重新运行它.空闲记忆:
过程树:
它的总和:
因此,我们的实际内存使用量最多为~2.9 GiB(构建数据帧时的峰值主进程)并且写入时复制有所帮助!
作为旁注,有一种所谓的copy-on-read,即Python 工程中描述的Python参考循环垃圾收集器的行为(导致gc.freeze问题31558).但gc.disable()在这种特殊情况下没有影响.
写时复制无副本数据共享的替代方法可以从一开始就通过使用将其委托给内核numpy.memmap.下面是一个示例实现从高性能数据处理在Python谈话.然后棘手的部分是让Pandas使用mmaped Numpy数组.
当您使用multiprocessing.Pool多个子进程时,将使用fork()系统调用创建。这些进程中的每一个都以当时父进程内存的精确副本开始。因为您在创建Pool大小为 3的 csv 之前加载 csv ,所以池中的这 3 个进程中的每一个都将不必要地拥有数据框的副本。(gen_matrix_df以及gen_matrix_df_list将存在于当前进程以及 3 个子进程中的每一个中,因此这些结构中的每一个的 4 个副本将在内存中)
尝试Pool在加载文件之前创建(实际上是在一开始)这应该会减少内存使用量。
如果它仍然太高,您可以:
将 gen_matrix_df_list 转储到文件中,每行 1 个项目,例如:
import os
import cPickle
with open('tempfile.txt', 'w') as f:
for item in gen_matrix_df_list.items():
cPickle.dump(item, f)
f.write(os.linesep)
Run Code Online (Sandbox Code Playgroud)Pool.imap()在您在此文件中转储的行上的迭代器上使用,例如:
with open('tempfile.txt', 'r') as f:
p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
Run Code Online (Sandbox Code Playgroud)
(请注意,在上面的示例中matrix_to_vcf需要一个(key, value)元组,而不仅仅是一个值)
我希望这有帮助。
注意:我还没有测试上面的代码。这只是为了证明这个想法。
我遇到过同样的问题。我需要处理一个巨大的文本语料库,同时保持内存中加载的数百万行的几个 DataFrame 的知识库。我认为这个问题很常见,所以我将保持我的回答面向一般目的。
设置的组合为我解决了这个问题(1 & 3 & 5 可能只为你做):
使用Pool.imap(或imap_unordered)代替Pool.map。这将懒惰地迭代数据,而不是在开始处理之前将所有数据加载到内存中。
为chunksize参数设置一个值。这也会使imap速度更快。
为maxtasksperchild参数设置一个值。
将输出附加到磁盘而不是内存。当它达到一定大小时立即或每隔一段时间。
分批运行代码。如果您有迭代器,则可以使用itertools.islice。这个想法是将你的list(gen_matrix_df_list.values())列表分成三个或更多的列表,然后你只将前三分之一传递给mapor imap,然后在另一个运行中传递第二个三分之一,等等。因为你有一个列表,你可以简单地在同一行代码中将它切片。
| 归档时间: |
|
| 查看次数: |
5656 次 |
| 最近记录: |