Jas*_*ram 6 python optimization multithreading logfiles multiprocessing
我的笔记本电脑上用于4.2 GB输入文件的此代码的运行时间为48秒.输入文件以制表符分隔,每个值都显示在引号中.每条记录以换行符结尾,例如'"val1"\t"val2"\t"val3"\t..."valn"\n'
我尝试过使用10个线程进行多处理:一个用于排队,一个用于解析各行并填充一个输出队列,一个用于将输出队列缩减为如下所示的defaultdict,但代码需要300秒才能运行,比以下时间长6倍以上:
from collections import defaultdict
def get_users(log):
users = defaultdict(int)
f = open(log)
# Read header line
h = f.readline().strip().replace('"', '').split('\t')
ix_profile = h.index('profile.type')
ix_user = h.index('profile.id')
# If either ix_* is the last field in h, it will include a newline.
# That's fine for now.
for (i, line) in enumerate(f):
if i % 1000000 == 0: print "Line %d" % i # progress notification
l = line.split('\t')
if l[ix_profile] != '"7"': # "7" indicates a bad value
# use list slicing to remove quotes
users[l[ix_user][1:-1]] += 1
f.close()
return users
Run Code Online (Sandbox Code Playgroud)
我已经通过从for循环删除除了print语句之外的所有内容来检查我是不是受I/O约束.该代码在9秒内运行,我将考虑这个代码运行速度的下限.
我有很多这5 GB文件要处理,所以即使是运行时的一个很小的改进(我知道,我可以删除打印!)将有所帮助.我运行的机器有4个内核,所以我不禁想知道是否有办法让多线程/多进程代码比上面的代码运行得更快.
更新:
我重写了多处理代码如下:
from multiprocessing import Pool, cpu_count
from collections import defaultdict
def parse(line, ix_profile=10, ix_user=9):
"""ix_profile and ix_user predetermined; hard-coding for expedience."""
l = line.split('\t')
if l[ix_profile] != '"7"':
return l[ix_user][1:-1]
def get_users_mp():
f = open('20110201.txt')
h = f.readline() # remove header line
pool = Pool(processes=cpu_count())
result_iter = pool.imap_unordered(parse, f, 100)
users = defaultdict(int)
for r in result_iter:
if r is not None:
users[r] += 1
return users
Run Code Online (Sandbox Code Playgroud)
它运行26秒,加速1.85倍.不错,但有4个核心,没有我想象的那么多.
使用正则表达式。
测试确定该过程中最昂贵的部分是对 str.split() 的调用。可能必须为每一行构造一个列表和一堆字符串对象,成本很高。
首先,您需要构造一个正则表达式来匹配该行。就像是:
expression = re.compile(r'("[^"]")\t("[^"]")\t')
Run Code Online (Sandbox Code Playgroud)
如果您调用 expression.match(line).groups(),您将获得前两列提取为两个字符串对象,并且您可以直接对它们进行逻辑处理。
现在假设感兴趣的两列是前两列。如果没有,您只需调整正则表达式以匹配正确的列。您的代码检查标题以查看列所在的位置。您可以基于此生成正则表达式,但我猜这些列实际上总是位于同一位置。只需验证它们是否仍然存在并在行上使用正则表达式即可。
编辑
从集合导入defaultdict导入重新
def get_users(log):
f = open(log)
# Read header line
h = f.readline().strip().replace('\'', '').split('\t')
ix_profile = h.index('profile.type')
ix_user = h.index('profile.id')
assert ix_user < ix_profile
Run Code Online (Sandbox Code Playgroud)
此代码假设用户位于个人资料之前
keep_field = r'"([^"]*)"'
Run Code Online (Sandbox Code Playgroud)
该正则表达式将捕获单个列
skip_field = r'"[^"]*"'
Run Code Online (Sandbox Code Playgroud)
此正则表达式将匹配列,但不捕获结果。(注意没有括号)
fields = [skip_field] * len(h)
fields[ix_profile] = keep_field
fields[ix_user] = keep_field
Run Code Online (Sandbox Code Playgroud)
为所有字段创建一个列表,只保留我们关心的字段
del fields[max(ix_profile, ix_user)+1:]
Run Code Online (Sandbox Code Playgroud)
消除我们关心的字段之后的所有字段(它们需要时间来匹配,我们不关心它们)
regex = re.compile(r"\t".join(fields))
Run Code Online (Sandbox Code Playgroud)
实际上生成正则表达式。
users = defaultdict(int)
for line in f:
user, profile = regex.match(line).groups()
Run Code Online (Sandbox Code Playgroud)
取出两个值,并进行逻辑运算
if profile != "7": # "7" indicates a bad value
# use list slicing to remove quotes
users[user] += 1
f.close()
return users
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2367 次 |
| 最近记录: |