使用Python获取文件的最后n行,类似于tail

Arm*_*her 174 python file-io file tail logfiles

我正在为Web应用程序编写一个日志文件查看器,为此我想通过日志文件的行分页.文件中的项目是基于行的,底部是最新项目.

所以我需要一种tail()方法,可以n从底部读取行并支持偏移量.我想出的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3
Run Code Online (Sandbox Code Playgroud)

这是一种合理的方法吗?使用偏移量尾部日志文件的推荐方法是什么?

S.L*_*ott 119

这可能比你的更快.不对线长做出假设.一次一个块地返回文件,直到找到正确数量的'\n'字符.

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
Run Code Online (Sandbox Code Playgroud)

我不喜欢关于线长的棘手假设 - 作为一个实际问题 - 你永远不会知道这样的事情.

通常,这将在第一次或第二次通过循环时找到最后20行.如果你的74个角色实际上是准确的,你可以使块大小为2048,你几乎可以立即尾随20行.

此外,我没有燃烧大量的脑热量,试图巧妙地与物理OS块对齐.使用这些高级I/O包,我怀疑你会看到尝试在OS块边界上对齐的任何性能后果.如果您使用较低级别的I/O,那么您可能会看到加速.

  • 这在小日志文件上失败 - IOError:无效参数 - f.seek(块*1024,2) (12认同)
  • 请勿使用此代码.它破坏了python 2.7中某些边界情况下的行.以下@papercrane的答案解决了这个问题. (6认同)
  • 不再适用于python 3.2.我得到`io.UnsupportedOperation:不能做非零的终端相对搜索`我可以将偏移更改为0,但这会破坏函数的用途. (5认同)
  • @DavidEnglund原因是[这里](http://www.velocityreviews.com/forums/t748976-python-3-2-bug-reading-the-last-line-of-a-file.html).简而言之:在文本模式下不允许相对于文件末尾的搜索,大概是因为文件内容必须被解码,并且一般来说,在编码字节序列中寻找任意位置时可能会有不确定的结果尝试从该位置开始解码为Unicode.该链接提供的建议是尝试以二进制模式打开文件并自己进行解码,捕获DecodeError异常. (4认同)

Mar*_*ark 84

假设你可以在Python 2上使用类似unix的系统:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]
Run Code Online (Sandbox Code Playgroud)

对于python 3,您可以这样做:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]
Run Code Online (Sandbox Code Playgroud)

  • 问题并不是说平台依赖是不可接受的.我不明白为什么这应该得到两个downvotes,当它提供一个非常unixy(可能是你正在寻找的......当然是对我来说)的方式正是在做问题所要求的. (40认同)
  • 您可能希望预先计算偏移量,如:`offset_total = str(n + offset)`并将此行替换为`stdin,stdout = os.popen2("tail -n"+ offset_total +""+ f)`以避免`TypeErrors(无法连接int + str)` (6认同)
  • 应该是平台独立的.此外,如果您阅读该问题,您将看到f是一个类似对象的文件. (5认同)
  • 谢谢,我以为我必须用纯Python来解决这个问题,但没有理由不使用UNIX实用程序,所以我选择了这个.现代Python中的FWIW,subprocess.check_output可能比os.popen2更可取; 它简化了一些事情,因为它只是将输出作为字符串返回,并引发非零退出代码. (3认同)
  • 虽然这是依赖于平台的,但它是一种非常有效的方式来做所谓的事情,并且是一种非常快速的方法(你不必将整个文件加载到内存中).@Shabbyrobe (3认同)
  • 考虑`tail('file.txt; rm -rf /',10)`(**不要打电话给**) (3认同)
  • @Mark更新可能会很好,因为popen2自pyton2.6以来已被弃用 (2认同)
  • python3: 之前 --> `proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)` 之后 --> `proc = subprocess.Popen([' tail', '-n', '\"%s\"' % (n + offset), file_path], stdout=subprocess.PIPE)` 防止错误 (2认同)

A. *_*ady 29

如果读取整个文件是可以接受的,那么使用双端队列.

from collections import deque
deque(f, maxlen=n)
Run Code Online (Sandbox Code Playgroud)

在2.6之前,deques没有maxlen选项,但它很容易实现.

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q
Run Code Online (Sandbox Code Playgroud)

如果要求从最后读取文件,则使用驰骋(又称指数)搜索.

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]
Run Code Online (Sandbox Code Playgroud)

  • @2mac [指数搜索](https://en.wikipedia.org/wiki/Exponential_search)。它从文件末尾迭代读取,每次读取的数量加倍,直到找到足够的行。 (2认同)

gle*_*bot 29

这是我的答案.纯蟒蛇.使用timeit似乎很快.拖尾100行有100,000行的日志文件:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165
Run Code Online (Sandbox Code Playgroud)

这是代码:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]
Run Code Online (Sandbox Code Playgroud)

  • 优雅的解决方 `if len(lines_found)>行:`真的有必要吗?`loop`条件不会也能抓住它吗? (3认同)
  • @MaximilianPeters是的。这不是必需的。我把它注释掉了。 (2认同)

pap*_*ane 25

S.Lott上面的答案几乎对我有用,但最终给了我部分线条.事实证明它破坏了块边界上的数据,因为数据以相反的顺序保持读取块.当调用'.join(数据)时,块的顺序错误.这解决了这个问题.

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]
Run Code Online (Sandbox Code Playgroud)


Arm*_*her 20

我最终使用的代码.我认为这是迄今为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3
Run Code Online (Sandbox Code Playgroud)

  • 没有完全回答这个问题. (4认同)

dim*_*tri 13

使用mmap简单快速的解决方案:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()
Run Code Online (Sandbox Code Playgroud)


Sha*_*ger 5

我对类似问题的回答中应评论者的要求发布答案,其中使用相同的技术来改变文件的最后一行,而不仅仅是获取它。

对于相当大的文件,这mmap是执行此操作的最佳方法。为了改进现有的mmap答案,这个版本在 Windows 和 Linux 之间是可移植的,并且应该运行得更快(尽管它不会在 32 位 Python 上没有一些修改的情况下工作,文件在 GB 范围内,请参阅其他答案以获取有关处理此问题的提示,以及修改以在 Python 2 上工作)。

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)
Run Code Online (Sandbox Code Playgroud)

这假设尾部的行数足够小,您可以一次安全地将它们全部读入内存;您还可以将其设为生成器函数,并通过将最后一行替换为以下内容来一次手动读取一行:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)
Run Code Online (Sandbox Code Playgroud)

最后,这个以二进制模式读取(必须使用mmap),所以它给出了str行(Py2)和bytes行(Py3);如果你想要unicode(Py2) 或str(Py3),可以调整迭代方法来为你解码和/或修复换行符:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines
Run Code Online (Sandbox Code Playgroud)

注意:我在一台无法访问 Python 进行测试的机器上输入了这些内容。如果我打错了什么,请告诉我;这与我认为它应该有效的其他答案非常相似,但是调整(例如处理)可能会导致细微的错误。如果有任何错误,请在评论中告诉我。offset


小智 5

将@papercrane 解决方案更新为 python3。使用open(filename, 'rb')和打开文件:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]
Run Code Online (Sandbox Code Playgroud)


小智 5

最简单的方法是使用deque

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)
Run Code Online (Sandbox Code Playgroud)

  • 这将迭代整个文件。如果您正在处理大文件,请记住这一点。 (4认同)