优化这个python日志解析代码

Jas*_*ram 6 python optimization multithreading logfiles multiprocessing

我的笔记本电脑上用于4.2 GB输入文件的此代码的运行时间为48秒.输入文件以制表符分隔,每个值都显示在引号中.每条记录以换行符结尾,例如'"val1"\t"val2"\t"val3"\t..."valn"\n'

我尝试过使用10个线程进行多处理:一个用于排队,一个用于解析各行并填充一个输出队列,一个用于将输出队列缩减为如下所示的defaultdict,但代码需要300秒才能运行,比以下时间长6倍以上:

from collections import defaultdict
def get_users(log):
    users = defaultdict(int)
    f = open(log)
    # Read header line
    h = f.readline().strip().replace('"', '').split('\t')
    ix_profile = h.index('profile.type')
    ix_user = h.index('profile.id')
    # If either ix_* is the last field in h, it will include a newline. 
    # That's fine for now.
    for (i, line) in enumerate(f): 
        if i % 1000000 == 0: print "Line %d" % i # progress notification

        l = line.split('\t')
        if l[ix_profile] != '"7"': # "7" indicates a bad value
            # use list slicing to remove quotes
            users[l[ix_user][1:-1]] += 1 

    f.close()
    return users
Run Code Online (Sandbox Code Playgroud)

我已经通过从for循环删除除了print语句之外的所有内容来检查我是不是受I/O约束.该代码在9秒内运行,我将考虑这个代码运行速度的下限.

我有很多这5 GB文件要处理,所以即使是运行时的一个很小的改进(我知道,我可以删除打印!)将有所帮助.我运行的机器有4个内核,所以我不禁想知道是否有办法让多线程/多进程代码比上面的代码运行得更快.

更新:

我重写了多处理代码如下:

from multiprocessing import Pool, cpu_count
from collections import defaultdict

def parse(line, ix_profile=10, ix_user=9):
    """ix_profile and ix_user predetermined; hard-coding for expedience."""
    l = line.split('\t')
    if l[ix_profile] != '"7"':
        return l[ix_user][1:-1]

def get_users_mp():
    f = open('20110201.txt')
    h = f.readline() # remove header line
    pool = Pool(processes=cpu_count())
    result_iter = pool.imap_unordered(parse, f, 100)
    users = defaultdict(int)
    for r in result_iter:
        if r is not None:
            users[r] += 1
    return users
Run Code Online (Sandbox Code Playgroud)

它运行26秒,加速1.85倍.不错,但有4个核心,没有我想象的那么多.

Win*_*ert 4

使用正则表达式。

测试确定该过程中最昂贵的部分是对 str.split() 的调用。可能必须为每一行构造一个列表和一堆字符串对象,成本很高。

首先,您需要构造一个正则表达式来匹配该行。就像是:

expression = re.compile(r'("[^"]")\t("[^"]")\t')
Run Code Online (Sandbox Code Playgroud)

如果您调用 expression.match(line).groups(),您将获得前两列提取为两个字符串对象,并且您可以直接对它们进行逻辑处理。

现在假设感兴趣的两列是前两列。如果没有,您只需调整正则表达式以匹配正确的列。您的代码检查标题以查看列所在的位置。您可以基于此生成正则表达式,但我猜这些列实际上总是位于同一位置。只需验证它们是否仍然存在并在行上使用正则表达式即可。

编辑

从集合导入defaultdict导入重新

def get_users(log):
    f = open(log)
    # Read header line
    h = f.readline().strip().replace('\'', '').split('\t')
    ix_profile = h.index('profile.type')
    ix_user = h.index('profile.id')

    assert ix_user < ix_profile
Run Code Online (Sandbox Code Playgroud)

此代码假设用户位于个人资料之前

    keep_field = r'"([^"]*)"'
Run Code Online (Sandbox Code Playgroud)

该正则表达式将捕获单个列

    skip_field = r'"[^"]*"'
Run Code Online (Sandbox Code Playgroud)

此正则表达式将匹配列,但不捕获结果。(注意没有括号)

    fields = [skip_field] * len(h)
    fields[ix_profile] = keep_field
    fields[ix_user] = keep_field
Run Code Online (Sandbox Code Playgroud)

为所有字段创建一个列表,只保留我们关心的字段

    del fields[max(ix_profile, ix_user)+1:]
Run Code Online (Sandbox Code Playgroud)

消除我们关心的字段之后的所有字段(它们需要时间来匹配,我们不关心它们)

    regex = re.compile(r"\t".join(fields))
Run Code Online (Sandbox Code Playgroud)

实际上生成正则表达式。

    users = defaultdict(int)
    for line in f:
        user, profile = regex.match(line).groups()
Run Code Online (Sandbox Code Playgroud)

取出两个值,并进行逻辑运算

        if profile != "7": # "7" indicates a bad value
            # use list slicing to remove quotes
            users[user] += 1 

    f.close()
    return users
Run Code Online (Sandbox Code Playgroud)