搜索 csv 文件最快的方法是什么?

Юри*_*аев 8 python csv

任务:检查文件中序列号和护照号码的可用性。

我的决定如下:

def check_passport(filename, series: str, number: str) -> dict:
    """
    Find passport number and series
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    print(f'series={series}, number={number}')
    find = False
    start = datetime.datetime.now()
    with open(filename, 'r', encoding='utf_8_sig') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        try:
            for row in reader:
                if row[0] == series and row[1] == num:
                    print(row[0])
                    print(row[1])
                    find = True
                    break
        except Exception as e:
            print(e)
    print(datetime.datetime.now() - start)
    if find:
        return {'result': False, 'message': f'Passport found'}
    else:
        return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)

这是 csv 文件的一部分

PASSP_SERIES,PASSP_NUMBER
3604,015558 
6003,711925 
6004,461914 
6001,789369
Run Code Online (Sandbox Code Playgroud)

如果您的档案中没有护照,则时间会更糟,因为您需要检查所有行。我的最好成绩是53秒。

Dar*_*ylG 10

检查了三种解决方案

  1. 原帖的CSV逐行方法
  2. 使用原始文本而不是使用 CSV 阅读器分区为 CSV 字段
  3. 使用 Pandas 读取和处理块中的数据

结果 使用 1000 万到 3000 万行进行测试。 在此输入图像描述

摘要 使用 Pandas 是最慢的方法。考虑到本文的观察结果,这并不奇怪(即 Pandas 由于其开销而成为读取 CSV 文件的较慢方法之一)。最快的方法是将文件作为原始文本文件处理并查找原始文本中的数字(比最初发布的使用 CSV 阅读器的方法快约 2 倍)。Pandas 比原始方法慢约 30%。

测试代码

import timeit
import time
import random
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt
import math
import itertools

def wrapper(func, *args, **kwargs):
    " Use to produced 0 argument function for call it"
    # Reference https://www.pythoncentral.io/time-a-python-function/
    def wrapped():
        return func(*args, **kwargs)
    return wrapped

def method_line_by_line(filename, series: str, number: str) -> dict:
    """
    Find passport number and series
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    find = False
    with open(filename, 'r', encoding='utf_8_sig') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        try:
            for row in reader:
                if row[0] == series and row[1] == num:
                    find = True
                    break
        except Exception as e:
            pass

    if find:
        return {'result': True, 'message': 'Passport found'}
    else:
        return {'result': False, 'message': 'Passport not found in Database'}

def method_raw_text(filename, series: str, number: str) -> dict:
    """
    Find passport number and series by interating through text records
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    pattern = series + "," + number
    with open(filename, 'r', encoding='utf_8_sig') as csvfile:
        if any(map(lambda x: pattern == x.rstrip(), csvfile)): # iterates through text looking for match
            return {'result': True, 'message': 'Passport found'}
        else:
            return {'result': False, 'message': 'Passport not found in Database'}

def method_pandas_chunks(filename, series: str, number: str) -> dict:
    """
    Find passport number and series using Pandas in chunks
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    chunksize = 10 ** 5
    for df in pd.read_csv(filename, chunksize=chunksize, dtype={'PASSP_SERIES': str,'PASSP_NUMBER':str}):

      df_search = df[(df['PASSP_SERIES'] == series) & (df['PASSP_NUMBER'] == number)]

      if not df_search.empty:
        break

    if not df_search.empty:
        return {'result': True, 'message': 'Passport found'}
    else:
        return {'result': False, 'message': 'Passport not found in Database'}

def generate_data(filename, number_records):
    " Generates random data for tests"
    df = pd.DataFrame(np.random.randint(0, 1e6,size=(number_records, 2)), columns=['PASSP_SERIES', 'PASSP_NUMBER'])
    df.to_csv(filename, index = None, header=True)
    return df

def profile():
    Nls = [x for x in range(10000000, 30000001, 5000000)] # range of number of test rows
    number_iterations = 3 # repeats per test
    methods = [method_line_by_line, method_raw_text, method_pandas_chunks]
    time_methods = [[] for _ in range(len(methods))]

    for N in Nls:
        # Generate CSV File with N rows
        generate_data('test.csv', N)

        for i, func in enumerate(methods):
            wrapped = wrapper(func, 'test.csv', 'x', 'y') # Use x & y to ensure we process entire
                                                          # file without finding a match
            time_methods[i].append(math.log(timeit.timeit(wrapped, number=number_iterations)))

    markers = itertools.cycle(('o', '+', '.'))
    colors = itertools.cycle(('r', 'b', 'g'))
    labels = itertools.cycle(('line-by-line', 'raw-text', 'pandas'))
    print(time_methods)
    for i in range(len(time_methods)):
        plt.plot(Nls,time_methods[i],marker = next(markers),color=next(colors),linestyle='-',label=next(labels))

    plt.xlabel('list size', fontsize=18)
    plt.ylabel('log(time)', fontsize=18)
    plt.legend(loc = 'upper left')
    plt.show()

# Run Test
profile()
Run Code Online (Sandbox Code Playgroud)


gel*_*ida 1

CSV 文件格式是一种方便且简单的文件格式。

它不是为了分析/快速搜索,这从来都不是目标。它有利于不同应用程序和任务之间的交换,其中您必须处理所有条目或条目量不是很大。

如果你想加快速度,你应该读取 CSV 文件一次并将其转换为数据库(例如 sqlite),然后在数据库中执行所有搜索。如果密码数字是唯一的,那么您甚至可以只使用简单的 dbm 文件或 python shelve。

可以通过向您搜索的字段添加索引来优化数据库性能。

这完全取决于 CSV 文件更改的频率以及执行搜索的频率,但通常这种方法应该会产生更好的结果。

我从未真正使用过 pandas,但也许它在搜索/过滤方面性能更高,尽管它永远不会打败在真实数据库中的搜索。

如果您想走 sqlite 或 dbm 之路,我可以帮助您编写一些代码。

附录(在使用 csv 阅读器读取之前,使用二等分搜索在排序的 csv 文件中进行搜索):

如果 csv 文件中的第一个字段是序列号,则还有另一种方法。(或者如果您愿意转换 csv 文件,以便可以使用 gnu sort 对其进行排序)

只需对文件进行排序(在 Linux 系统上使用 gnu sort 很容易做到。它可以对大文件进行排序,而不会“消耗”内存),并且排序时间不应比您目前的搜索时间长很多。

然后在文件中使用二等分/查找搜索具有正确序列号的第一行。然后使用现有的函数并进行较小的修改。

这将在几毫秒内给您结果。我尝试使用随机创建的 csv 文件,其中包含 3000 万个条目,大小约为 1.5G。

如果在 Linux 系统上运行,您甚至可以更改代码,以便每当 csv 文件发生更改时,它都会创建您下载的 csv 文件的排序副本。(在我的机器上排序大约需要 1 到 2 分钟)因此,每周进行 2 到 3 次搜索后,这是值得的。

import csv
import datetime
import os

def get_line_at_pos(fin, pos):
    """ fetches first complete line at offset pos
        always skips header line
    """
    fin.seek(pos)
    skip = fin.readline()
    # next line for debugging only
    # print("Skip@%d: %r" % (pos, skip))
    npos = fin.tell()
    assert pos + len(skip) == npos
    line = fin.readline()
    return npos, line


def bisect_seek(fname, field_func, field_val):
    """ returns a file postion, which guarantees, that you will
        encounter all lines, that migth encounter field_val
        if the file is ordered by field_val.
        field_func is the function to extract field_val from a line
        The search is a bisect search, with a complexity of log(n)
    """
    size = os.path.getsize(fname)
    minpos, maxpos, cur = 0, size, int(size / 2)

    with open(fname) as fin:
        small_pos = 0
        # next line just for debugging
        state = "?"
        prev_pos = -1
        while True:  # find first id smaller than the one we search
            # next line just for debugging
            pos_str = "%s %10d %10d %10d" % (state, minpos, cur, maxpos)
            realpos, line = get_line_at_pos(fin, cur)
            val = field_func(line)
            # next line just for debugging
            pos_str += "# got @%d: %r %r" % (realpos, val, line)
            if val >= field_val:
                state = ">"
                maxpos = cur
                cur = int((minpos + cur) / 2)
            else:
                state = "<"
                minpos = cur
                cur = int((cur + maxpos) / 2)
            # next line just for debugging
            # print(pos_str)
            if prev_pos == cur:
                break
            prev_pos = cur
    return realpos


def getser(line):
    return line.split(",")[0]


def check_passport(filename, series: str, number: str) -> dict:
    """
    Find passport number and series
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    print(f'series={series}, number={number}')
    found = False
    start = datetime.datetime.now()
    # find position from which we should start searching
    pos = bisect_seek(filename, getser, series)
    with open(filename, 'r', encoding='utf_8_sig') as csvfile:
        csvfile.seek(pos)
        reader = csv.reader(csvfile, delimiter=',')
        try:
            for row in reader:
                if row[0] == series and row[1] == number:
                    found = True
                    break
                elif row[0] > series:
                    # as file is sorted we know we can abort now
                    break
        except Exception as e:
            print(e)
    print(datetime.datetime.now() - start)
    if found:
        print("good row", row)
        return {'result': True, 'message': f'Passport found'}
    else:
        print("bad row", row)
        return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)

附录 2019-11-30:这里有一个脚本,用于将大文件分割成较小的块并对每个块进行排序。(我不想实现完整的合并排序,因为在这种情况下,在每个块中搜索已经足够有效。如果对 mor 感兴趣,我建议尝试实现合并排序或发布有关在 Windows 下对大文件进行排序的问题与蟒蛇)

split_n_sort_csv.py:

import itertools
import sys
import time

def main():
    args = sys.argv[1:]
    t = t0 = time.time()
    with open(args[0]) as fin:
        headline = next(fin)
        for idx in itertools.count():
            print(idx, "r")
            tprev = t
            lines = list(itertools.islice(fin, 10000000))
            t = time.time()
            t_read = t - tprev
            tprev = t
            print("s")
            lines.sort()
            t = time.time()
            t_sort = t - tprev
            tprev = t
            print("w")
            with open("bla_%03d.csv" % idx, "w") as fout:
                fout.write(headline)
                for line in lines:
                    fout.write(line)
            t = time.time()
            t_write = t - tprev
            tprev = t

            print("%4.1f %4.1f %4.1f" % (t_read, t_sort, t_write))
            if not lines:
                break
    t = time.time()
    print("Total of %5.1fs" % (t-t0))

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

这里是一个修改版本,可以搜索所有块文件。

import csv
import datetime
import itertools
import os

ENCODING='utf_8_sig'

def get_line_at_pos(fin, pos, enc_encoding="utf_8"):
    """ fetches first complete line at offset pos
        always skips header line
    """
    while True:
        fin.seek(pos)
        try:
            skip = fin.readline()
            break
        except UnicodeDecodeError:
            pos += 1

    # print("Skip@%d: %r" % (pos, skip))
    npos = fin.tell()
    assert pos + len(skip.encode(enc_encoding)) == npos
    line = fin.readline()
    return npos, line

def bisect_seek(fname, field_func, field_val, encoding=ENCODING):
    size = os.path.getsize(fname)
    vmin, vmax, cur = 0, size, int(size / 2)
    if encoding.endswith("_sig"):
        enc_encoding = encoding[:-4]
    else:
        enc_encoding = encoding
    with open(fname, encoding=encoding) as fin:
        small_pos = 0
        state = "?"
        prev_pos = -1
        while True:  # find first id smaller than the one we search
            # next line only for debugging
            pos_str = "%s %10d %10d %10d" % (state, vmin, cur, vmax)
            realpos, line = get_line_at_pos(fin, cur, enc_encoding=enc_encoding)
            val = field_func(line)
            # next line only for debugging
            pos_str += "# got @%d: %r %r" % (realpos, val, line)
            if val >= field_val:
                state = ">"
                vmax = cur
                cur = int((vmin + cur) / 2)
            else:
                state = "<"
                vmin = cur
                cur = int((cur + vmax) / 2)
            # next line only for debugging
            # print(pos_str)
            if prev_pos == cur:
                break
            prev_pos = cur
    return realpos

def getser(line):
    return line.split(",")[0]

def check_passport(filename, series: str, number: str,
        encoding=ENCODING) -> dict:
    """
    Find passport number and series
    :param filename:csv filename path
    :param series: passport series
    :param number: passport number
    :return:
    """
    print(f'series={series}, number={number}')
    found = False
    start = datetime.datetime.now()
    for ctr in itertools.count():
        fname = filename % ctr
        if not os.path.exists(fname):
            break
        print(fname)
        pos = bisect_seek(fname, getser, series)
        with open(fname, 'r', encoding=encoding) as csvfile:
            csvfile.seek(pos)
            reader = csv.reader(csvfile, delimiter=',')
            try:
                for row in reader:
                    if row[0] == series and row[1] == number:
                        found = True
                        break
                    elif row[0] > series:
                        break
            except Exception as e:
                print(e)
        if found:
            break
    print(datetime.datetime.now() - start)
    if found:
        print("good row in %s: %d", (fname, row))
        return {'result': True, 'message': f'Passport found'}
    else:
        print("bad row", row)
        return {'result': False, 'message': f'Passport not found in Database'}
Run Code Online (Sandbox Code Playgroud)

要进行测试,请拨打:

check_passport("bla_%03d.csv", series, number)
Run Code Online (Sandbox Code Playgroud)