任务:检查文件中序列号和护照号码的可用性。
我的决定如下:
def check_passport(filename, series: str, number: str) -> dict:
"""
Find passport number and series
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
print(f'series={series}, number={number}')
find = False
start = datetime.datetime.now()
with open(filename, 'r', encoding='utf_8_sig') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
try:
for row in reader:
if row[0] == series and row[1] == num:
print(row[0])
print(row[1])
find = True
break
except Exception as e:
print(e)
print(datetime.datetime.now() - start)
if find:
return {'result': False, 'message': f'Passport found'}
else:
return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)
这是 csv 文件的一部分
PASSP_SERIES,PASSP_NUMBER
3604,015558
6003,711925
6004,461914
6001,789369
Run Code Online (Sandbox Code Playgroud)
如果您的档案中没有护照,则时间会更糟,因为您需要检查所有行。我的最好成绩是53秒。
Dar*_*ylG 10
检查了三种解决方案
摘要 使用 Pandas 是最慢的方法。考虑到本文的观察结果,这并不奇怪(即 Pandas 由于其开销而成为读取 CSV 文件的较慢方法之一)。最快的方法是将文件作为原始文本文件处理并查找原始文本中的数字(比最初发布的使用 CSV 阅读器的方法快约 2 倍)。Pandas 比原始方法慢约 30%。
测试代码
import timeit
import time
import random
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt
import math
import itertools
def wrapper(func, *args, **kwargs):
" Use to produced 0 argument function for call it"
# Reference https://www.pythoncentral.io/time-a-python-function/
def wrapped():
return func(*args, **kwargs)
return wrapped
def method_line_by_line(filename, series: str, number: str) -> dict:
"""
Find passport number and series
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
find = False
with open(filename, 'r', encoding='utf_8_sig') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
try:
for row in reader:
if row[0] == series and row[1] == num:
find = True
break
except Exception as e:
pass
if find:
return {'result': True, 'message': 'Passport found'}
else:
return {'result': False, 'message': 'Passport not found in Database'}
def method_raw_text(filename, series: str, number: str) -> dict:
"""
Find passport number and series by interating through text records
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
pattern = series + "," + number
with open(filename, 'r', encoding='utf_8_sig') as csvfile:
if any(map(lambda x: pattern == x.rstrip(), csvfile)): # iterates through text looking for match
return {'result': True, 'message': 'Passport found'}
else:
return {'result': False, 'message': 'Passport not found in Database'}
def method_pandas_chunks(filename, series: str, number: str) -> dict:
"""
Find passport number and series using Pandas in chunks
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
chunksize = 10 ** 5
for df in pd.read_csv(filename, chunksize=chunksize, dtype={'PASSP_SERIES': str,'PASSP_NUMBER':str}):
df_search = df[(df['PASSP_SERIES'] == series) & (df['PASSP_NUMBER'] == number)]
if not df_search.empty:
break
if not df_search.empty:
return {'result': True, 'message': 'Passport found'}
else:
return {'result': False, 'message': 'Passport not found in Database'}
def generate_data(filename, number_records):
" Generates random data for tests"
df = pd.DataFrame(np.random.randint(0, 1e6,size=(number_records, 2)), columns=['PASSP_SERIES', 'PASSP_NUMBER'])
df.to_csv(filename, index = None, header=True)
return df
def profile():
Nls = [x for x in range(10000000, 30000001, 5000000)] # range of number of test rows
number_iterations = 3 # repeats per test
methods = [method_line_by_line, method_raw_text, method_pandas_chunks]
time_methods = [[] for _ in range(len(methods))]
for N in Nls:
# Generate CSV File with N rows
generate_data('test.csv', N)
for i, func in enumerate(methods):
wrapped = wrapper(func, 'test.csv', 'x', 'y') # Use x & y to ensure we process entire
# file without finding a match
time_methods[i].append(math.log(timeit.timeit(wrapped, number=number_iterations)))
markers = itertools.cycle(('o', '+', '.'))
colors = itertools.cycle(('r', 'b', 'g'))
labels = itertools.cycle(('line-by-line', 'raw-text', 'pandas'))
print(time_methods)
for i in range(len(time_methods)):
plt.plot(Nls,time_methods[i],marker = next(markers),color=next(colors),linestyle='-',label=next(labels))
plt.xlabel('list size', fontsize=18)
plt.ylabel('log(time)', fontsize=18)
plt.legend(loc = 'upper left')
plt.show()
# Run Test
profile()
Run Code Online (Sandbox Code Playgroud)
CSV 文件格式是一种方便且简单的文件格式。
它不是为了分析/快速搜索,这从来都不是目标。它有利于不同应用程序和任务之间的交换,其中您必须处理所有条目或条目量不是很大。
如果你想加快速度,你应该读取 CSV 文件一次并将其转换为数据库(例如 sqlite),然后在数据库中执行所有搜索。如果密码数字是唯一的,那么您甚至可以只使用简单的 dbm 文件或 python shelve。
可以通过向您搜索的字段添加索引来优化数据库性能。
这完全取决于 CSV 文件更改的频率以及执行搜索的频率,但通常这种方法应该会产生更好的结果。
我从未真正使用过 pandas,但也许它在搜索/过滤方面性能更高,尽管它永远不会打败在真实数据库中的搜索。
如果您想走 sqlite 或 dbm 之路,我可以帮助您编写一些代码。
附录(在使用 csv 阅读器读取之前,使用二等分搜索在排序的 csv 文件中进行搜索):
如果 csv 文件中的第一个字段是序列号,则还有另一种方法。(或者如果您愿意转换 csv 文件,以便可以使用 gnu sort 对其进行排序)
只需对文件进行排序(在 Linux 系统上使用 gnu sort 很容易做到。它可以对大文件进行排序,而不会“消耗”内存),并且排序时间不应比您目前的搜索时间长很多。
然后在文件中使用二等分/查找搜索具有正确序列号的第一行。然后使用现有的函数并进行较小的修改。
这将在几毫秒内给您结果。我尝试使用随机创建的 csv 文件,其中包含 3000 万个条目,大小约为 1.5G。
如果在 Linux 系统上运行,您甚至可以更改代码,以便每当 csv 文件发生更改时,它都会创建您下载的 csv 文件的排序副本。(在我的机器上排序大约需要 1 到 2 分钟)因此,每周进行 2 到 3 次搜索后,这是值得的。
import csv
import datetime
import os
def get_line_at_pos(fin, pos):
""" fetches first complete line at offset pos
always skips header line
"""
fin.seek(pos)
skip = fin.readline()
# next line for debugging only
# print("Skip@%d: %r" % (pos, skip))
npos = fin.tell()
assert pos + len(skip) == npos
line = fin.readline()
return npos, line
def bisect_seek(fname, field_func, field_val):
""" returns a file postion, which guarantees, that you will
encounter all lines, that migth encounter field_val
if the file is ordered by field_val.
field_func is the function to extract field_val from a line
The search is a bisect search, with a complexity of log(n)
"""
size = os.path.getsize(fname)
minpos, maxpos, cur = 0, size, int(size / 2)
with open(fname) as fin:
small_pos = 0
# next line just for debugging
state = "?"
prev_pos = -1
while True: # find first id smaller than the one we search
# next line just for debugging
pos_str = "%s %10d %10d %10d" % (state, minpos, cur, maxpos)
realpos, line = get_line_at_pos(fin, cur)
val = field_func(line)
# next line just for debugging
pos_str += "# got @%d: %r %r" % (realpos, val, line)
if val >= field_val:
state = ">"
maxpos = cur
cur = int((minpos + cur) / 2)
else:
state = "<"
minpos = cur
cur = int((cur + maxpos) / 2)
# next line just for debugging
# print(pos_str)
if prev_pos == cur:
break
prev_pos = cur
return realpos
def getser(line):
return line.split(",")[0]
def check_passport(filename, series: str, number: str) -> dict:
"""
Find passport number and series
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
print(f'series={series}, number={number}')
found = False
start = datetime.datetime.now()
# find position from which we should start searching
pos = bisect_seek(filename, getser, series)
with open(filename, 'r', encoding='utf_8_sig') as csvfile:
csvfile.seek(pos)
reader = csv.reader(csvfile, delimiter=',')
try:
for row in reader:
if row[0] == series and row[1] == number:
found = True
break
elif row[0] > series:
# as file is sorted we know we can abort now
break
except Exception as e:
print(e)
print(datetime.datetime.now() - start)
if found:
print("good row", row)
return {'result': True, 'message': f'Passport found'}
else:
print("bad row", row)
return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)
附录 2019-11-30:这里有一个脚本,用于将大文件分割成较小的块并对每个块进行排序。(我不想实现完整的合并排序,因为在这种情况下,在每个块中搜索已经足够有效。如果对 mor 感兴趣,我建议尝试实现合并排序或发布有关在 Windows 下对大文件进行排序的问题与蟒蛇)
split_n_sort_csv.py:
import itertools
import sys
import time
def main():
args = sys.argv[1:]
t = t0 = time.time()
with open(args[0]) as fin:
headline = next(fin)
for idx in itertools.count():
print(idx, "r")
tprev = t
lines = list(itertools.islice(fin, 10000000))
t = time.time()
t_read = t - tprev
tprev = t
print("s")
lines.sort()
t = time.time()
t_sort = t - tprev
tprev = t
print("w")
with open("bla_%03d.csv" % idx, "w") as fout:
fout.write(headline)
for line in lines:
fout.write(line)
t = time.time()
t_write = t - tprev
tprev = t
print("%4.1f %4.1f %4.1f" % (t_read, t_sort, t_write))
if not lines:
break
t = time.time()
print("Total of %5.1fs" % (t-t0))
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
这里是一个修改版本,可以搜索所有块文件。
import csv
import datetime
import itertools
import os
ENCODING='utf_8_sig'
def get_line_at_pos(fin, pos, enc_encoding="utf_8"):
""" fetches first complete line at offset pos
always skips header line
"""
while True:
fin.seek(pos)
try:
skip = fin.readline()
break
except UnicodeDecodeError:
pos += 1
# print("Skip@%d: %r" % (pos, skip))
npos = fin.tell()
assert pos + len(skip.encode(enc_encoding)) == npos
line = fin.readline()
return npos, line
def bisect_seek(fname, field_func, field_val, encoding=ENCODING):
size = os.path.getsize(fname)
vmin, vmax, cur = 0, size, int(size / 2)
if encoding.endswith("_sig"):
enc_encoding = encoding[:-4]
else:
enc_encoding = encoding
with open(fname, encoding=encoding) as fin:
small_pos = 0
state = "?"
prev_pos = -1
while True: # find first id smaller than the one we search
# next line only for debugging
pos_str = "%s %10d %10d %10d" % (state, vmin, cur, vmax)
realpos, line = get_line_at_pos(fin, cur, enc_encoding=enc_encoding)
val = field_func(line)
# next line only for debugging
pos_str += "# got @%d: %r %r" % (realpos, val, line)
if val >= field_val:
state = ">"
vmax = cur
cur = int((vmin + cur) / 2)
else:
state = "<"
vmin = cur
cur = int((cur + vmax) / 2)
# next line only for debugging
# print(pos_str)
if prev_pos == cur:
break
prev_pos = cur
return realpos
def getser(line):
return line.split(",")[0]
def check_passport(filename, series: str, number: str,
encoding=ENCODING) -> dict:
"""
Find passport number and series
:param filename:csv filename path
:param series: passport series
:param number: passport number
:return:
"""
print(f'series={series}, number={number}')
found = False
start = datetime.datetime.now()
for ctr in itertools.count():
fname = filename % ctr
if not os.path.exists(fname):
break
print(fname)
pos = bisect_seek(fname, getser, series)
with open(fname, 'r', encoding=encoding) as csvfile:
csvfile.seek(pos)
reader = csv.reader(csvfile, delimiter=',')
try:
for row in reader:
if row[0] == series and row[1] == number:
found = True
break
elif row[0] > series:
break
except Exception as e:
print(e)
if found:
break
print(datetime.datetime.now() - start)
if found:
print("good row in %s: %d", (fname, row))
return {'result': True, 'message': f'Passport found'}
else:
print("bad row", row)
return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)
要进行测试,请拨打:
check_passport("bla_%03d.csv", series, number)
Run Code Online (Sandbox Code Playgroud)