您可以将数据收集到dictIP是密钥的位置,值包含给定IP的时间戳.然后,每次添加时间戳时,您都可以检查给定的IP是否在一秒内有三个时间戳:
from datetime import datetime, timedelta
from collections import defaultdict, deque
import re
THRESHOLD = timedelta(seconds=1)
COUNT = 3
res = set()
d = defaultdict(deque)
with open('test.txt') as f:
for line in f:
# Capture IP and timestamp
m = re.match(r'(\S*)[^\[]*\[(\S*)', line)
ip, dt = m.groups()
# Parse timestamp
dt = datetime.strptime(dt, '%d/%b/%Y:%H:%M:%S:%f')
# Remove timestamps from deque if they are older than threshold
que = d[ip]
while que and (dt - que[0]) > THRESHOLD:
que.popleft()
# Add timestamp, update result if there's 3 or more items
que.append(dt)
if len(que) >= COUNT:
res.add(ip)
print(res)
Run Code Online (Sandbox Code Playgroud)
结果:
{'28.252.89.140'}
Run Code Online (Sandbox Code Playgroud)
上面逐行读取包含日志的日志文件.对于每一行,正则表达式用于捕获两组数据:IP和时间戳.然后strptime用于解析时间.
第一组(\S*)捕获除空白之外的所有内容.然后[^\[]*捕获除了之外的所有内容[并\[在时间戳之前捕获最终字符.最后(\S*)再次使用捕获所有内容直到下一个空格.请参阅regex101上的示例.
一旦我们有了IP和时间,就会将它们添加到defaultdict使用IP作为密钥的位置,并且值是deque时间戳.在添加新时间戳之前,如果它们早于旧时间戳,则会删除旧时间戳THRESHOLD.这假设日志行已按时间排序.添加后,检查长度,如果COUNT队列中有多个项目,则将IP添加到结果集中.
| 归档时间: |
|
| 查看次数: |
345 次 |
| 最近记录: |