Arm*_*her 174 python file-io file tail logfiles
我正在为Web应用程序编写一个日志文件查看器,为此我想通过日志文件的行分页.文件中的项目是基于行的,底部是最新项目.
所以我需要一种tail()
方法,可以n
从底部读取行并支持偏移量.我想出的是这样的:
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
Run Code Online (Sandbox Code Playgroud)
这是一种合理的方法吗?使用偏移量尾部日志文件的推荐方法是什么?
S.L*_*ott 119
这可能比你的更快.不对线长做出假设.一次一个块地返回文件,直到找到正确数量的'\n'字符.
def tail( f, lines=20 ):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
Run Code Online (Sandbox Code Playgroud)
我不喜欢关于线长的棘手假设 - 作为一个实际问题 - 你永远不会知道这样的事情.
通常,这将在第一次或第二次通过循环时找到最后20行.如果你的74个角色实际上是准确的,你可以使块大小为2048,你几乎可以立即尾随20行.
此外,我没有燃烧大量的脑热量,试图巧妙地与物理OS块对齐.使用这些高级I/O包,我怀疑你会看到尝试在OS块边界上对齐的任何性能后果.如果您使用较低级别的I/O,那么您可能会看到加速.
Mar*_*ark 84
假设你可以在Python 2上使用类似unix的系统:
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
stdin.close()
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
Run Code Online (Sandbox Code Playgroud)
对于python 3,您可以这样做:
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
Run Code Online (Sandbox Code Playgroud)
A. *_*ady 29
如果读取整个文件是可以接受的,那么使用双端队列.
from collections import deque
deque(f, maxlen=n)
Run Code Online (Sandbox Code Playgroud)
在2.6之前,deques没有maxlen选项,但它很容易实现.
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
q.append(item)
return q
Run Code Online (Sandbox Code Playgroud)
如果要求从最后读取文件,则使用驰骋(又称指数)搜索.
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
try:
f.seek(-pos, 2)
except IOError:
f.seek(0)
break
finally:
lines = list(f)
pos *= 2
return lines[-n:]
Run Code Online (Sandbox Code Playgroud)
gle*_*bot 29
这是我的答案.纯蟒蛇.使用timeit似乎很快.拖尾100行有100,000行的日志文件:
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165
Run Code Online (Sandbox Code Playgroud)
这是代码:
import os
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
try:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
# we found enough lines, get out
# Removed this line because it was redundant the while will catch
# it, I left it for history
# if len(lines_found) > lines:
# break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
Run Code Online (Sandbox Code Playgroud)
pap*_*ane 25
S.Lott上面的答案几乎对我有用,但最终给了我部分线条.事实证明它破坏了块边界上的数据,因为数据以相反的顺序保持读取块.当调用'.join(数据)时,块的顺序错误.这解决了这个问题.
def tail(f, window=20):
"""
Returns the last `window` lines of file `f` as a list.
f - a byte file-like object
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
data.insert(0, f.read(BUFSIZ))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
data.insert(0, f.read(bytes))
linesFound = data[0].count('\n')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
Run Code Online (Sandbox Code Playgroud)
Arm*_*her 20
我最终使用的代码.我认为这是迄今为止最好的:
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
"""
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], \
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
Run Code Online (Sandbox Code Playgroud)
dim*_*tri 13
使用mmap简单快速的解决方案:
import mmap
import os
def tail(filename, n):
"""Returns last n lines from the filename. No exception handling"""
size = os.path.getsize(filename)
with open(filename, "rb") as f:
# for Windows the mmap parameters are different
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
try:
for i in xrange(size - 1, -1, -1):
if fm[i] == '\n':
n -= 1
if n == -1:
break
return fm[i + 1 if i else 0:].splitlines()
finally:
fm.close()
Run Code Online (Sandbox Code Playgroud)
在我对类似问题的回答中应评论者的要求发布答案,其中使用相同的技术来改变文件的最后一行,而不仅仅是获取它。
对于相当大的文件,这mmap
是执行此操作的最佳方法。为了改进现有的mmap
答案,这个版本在 Windows 和 Linux 之间是可移植的,并且应该运行得更快(尽管它不会在 32 位 Python 上没有一些修改的情况下工作,文件在 GB 范围内,请参阅其他答案以获取有关处理此问题的提示,以及修改以在 Python 2 上工作)。
import io # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap
def skip_back_lines(mm, numlines, startidx):
'''Factored out to simplify handling of n and offset'''
for _ in itertools.repeat(None, numlines):
startidx = mm.rfind(b'\n', 0, startidx)
if startidx < 0:
break
return startidx
def tail(f, n, offset=0):
# Reopen file in binary mode
with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# len(mm) - 1 handles files ending w/newline by getting the prior line
startofline = skip_back_lines(mm, offset, len(mm) - 1)
if startofline < 0:
return [] # Offset lines consumed whole file, nothing to return
# If using a generator function (yield-ing, see below),
# this should be a plain return, no empty list
endoflines = startofline + 1 # Slice end to omit offset lines
# Find start of lines to capture (add 1 to move from newline to beginning of following line)
startofline = skip_back_lines(mm, n, startofline) + 1
# Passing True to splitlines makes it return the list of lines without
# removing the trailing newline (if any), so list mimics f.readlines()
return mm[startofline:endoflines].splitlines(True)
# If Windows style \r\n newlines need to be normalized to \n, and input
# is ASCII compatible, can normalize newlines with:
# return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)
Run Code Online (Sandbox Code Playgroud)
这假设尾部的行数足够小,您可以一次安全地将它们全部读入内存;您还可以将其设为生成器函数,并通过将最后一行替换为以下内容来一次手动读取一行:
mm.seek(startofline)
# Call mm.readline n times, or until EOF, whichever comes first
# Python 3.2 and earlier:
for line in itertools.islice(iter(mm.readline, b''), n):
yield line
# 3.3+:
yield from itertools.islice(iter(mm.readline, b''), n)
Run Code Online (Sandbox Code Playgroud)
最后,这个以二进制模式读取(必须使用mmap
),所以它给出了str
行(Py2)和bytes
行(Py3);如果你想要unicode
(Py2) 或str
(Py3),可以调整迭代方法来为你解码和/或修复换行符:
lines = itertools.islice(iter(mm.readline, b''), n)
if f.encoding: # Decode if the passed file was opened with a specific encoding
lines = (line.decode(f.encoding) for line in lines)
if 'b' not in f.mode: # Fix line breaks if passed file opened in text mode
lines = (line.replace(os.linesep, '\n') for line in lines)
# Python 3.2 and earlier:
for line in lines:
yield line
# 3.3+:
yield from lines
Run Code Online (Sandbox Code Playgroud)
注意:我在一台无法访问 Python 进行测试的机器上输入了这些内容。如果我打错了什么,请告诉我;这与我认为它应该有效的其他答案非常相似,但是调整(例如处理)可能会导致细微的错误。如果有任何错误,请在评论中告诉我。offset
小智 5
将@papercrane 解决方案更新为 python3。使用open(filename, 'rb')
和打开文件:
def tail(f, window=20):
"""Returns the last `window` lines of file `f` as a list.
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
remaining_bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and remaining_bytes > 0:
if remaining_bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
bunch = f.read(BUFSIZ)
else:
# file too small, start from beginning
f.seek(0, 0)
# only read what was not read
bunch = f.read(remaining_bytes)
bunch = bunch.decode('utf-8')
data.insert(0, bunch)
size -= bunch.count('\n')
remaining_bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
Run Code Online (Sandbox Code Playgroud)
小智 5
最简单的方法是使用deque
:
from collections import deque
def tail(filename, n=10):
with open(filename) as f:
return deque(f, n)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
139755 次 |
最近记录: |