从大型CSV文件中快速提取行块

Jef*_*vis 6 python csv performance

我有一个大型的CSV文件,其中包含与库存相关的数据,格式如下:

股票代码,日期,[一些变量......]

因此,每一行都以符号开头(如"AMZN"),然后有日期,然后在所选日期有12个与价格或数量相关的变量.这个文件中有大约10,000种不同的证券,我每天都有一条线,每条股票都是公开交易的.该文件首先按字母顺序按股票代码排序,按时间顺序按日期排序.整个文件大约是3.3 GB.

我想要解决的任务类型是能够针对当前日期提取给定股票代码符号的最新n行数据.我有代码执行此操作,但根据我的观察,平均每次检索需要大约8-10秒(所有测试都提取了100行).

我有我想要运行的功能,需要我抓住数百或数千个符号的这些块,我真的想减少时间.我的代码效率低下,但我不确定如何让它运行得更快.

首先,我有一个名为getData的函数:

def getData(symbol, filename):
  out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
         "Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
  l = len(symbol)
  beforeMatch = True
  with open(filename, 'r') as f:
    for line in f:
        match = checkMatch(symbol, l, line)
        if beforeMatch and match:
            beforeMatch = False
            out.append(formatLineData(line[:-1].split(",")))
        elif not beforeMatch and match:
            out.append(formatLineData(line[:-1].split(",")))
        elif not beforeMatch and not match:
            break
  return out
Run Code Online (Sandbox Code Playgroud)

(这段代码有几个辅助函数,checkMatch和formatLineData,我将在下面展示.)然后,还有另一个名为getDataColumn的函数,它使用正确的天数来获取我想要的列:

def getDataColumn(symbol, col=12, numDays=100, changeRateTransform=False):
  dataset = getData(symbol)
  if not changeRateTransform:
    column = [day[col] for day in dataset[-numDays:]]
  else:
    n = len(dataset)
    column = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
  return column
Run Code Online (Sandbox Code Playgroud)

(如果为True,changeRateTransform将原始数字转换为每日更改率数字.)帮助函数:

def checkMatch(symbol, symbolLength, line):
  out = False
  if line[:symbolLength+1] == symbol + ",":
    out = True
  return out

def formatLineData(lineData):
  out = [lineData[0]]
  out.append(datetime.strptime(lineData[1], '%Y-%m-%d').date())
  out += [float(d) for d in lineData[2:6]]
  out += [int(float(d)) for d in lineData[6:9]]
  out += [float(d) for d in lineData[9:13]]
  out.append(int(float(lineData[13])))
  return out
Run Code Online (Sandbox Code Playgroud)

有没有人对我的代码的哪些部分运行缓慢以及如何使其表现更好有任何见解?如果不加快速度,我不能做我想做的那种分析.


编辑:为了回应这些评论,我对代码进行了一些更改,以便利用csv模块中的现有方法:

def getData(symbol, database):
  out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
         "Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
  l = len(symbol)
  beforeMatch = True
  with open(database, 'r') as f:
    databaseReader = csv.reader(f, delimiter=",")
    for row in databaseReader:
        match = (row[0] == symbol)
        if beforeMatch and match:
            beforeMatch = False
            out.append(formatLineData(row))
        elif not beforeMatch and match:
            out.append(formatLineData(row))
        elif not beforeMatch and not match:
            break
  return out

def getDataColumn(dataset, col=12, numDays=100, changeRateTransform=False):
  if not changeRateTransform:
    out = [day[col] for day in dataset[-numDays:]]
  else:
    n = len(dataset)
    out = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
  return out
Run Code Online (Sandbox Code Playgroud)

使用csv.reader类的性能更差.我测试了两种股票,AMZN(靠近文件顶部)和ZNGA(靠近文件底部).使用原始方法,运行时间分别为0.99秒和18.37秒.利用csv模块的新方法,运行时间分别为3.04秒和64.94秒.两者都返回正确的结果.

我的想法是,从寻找股票而不是从解析中获取的时间更多.如果我在文件A中的第一个库存中尝试这些方法,则这些方法都会在大约0.12秒内运行.

Thi*_*ien 3

当您要对同一数据集进行大量分析时,务实的方法是将其全部读入数据库。它是为快速查询而设计的;CSV 不是。例如,使用 sqlite 命令行工具,可以直接从 CSV 导入。然后添加单个索引(Symbol, Date),查找几乎是即时的。

\n\n

如果由于某种原因这是不可行的,例如因为新文件随时可能进入,而您在开始分析它们之前没有足够的准备时间,那么您必须充分利用直接处理 CSV 的方法,这这就是我其余答案的重点。但请记住,这是一种平衡行为。您要么预先支付很多费用,要么为每次查找额外支付一点费用。最终,对于一定数量的查找,预先支付会更便宜。

\n\n

优化就是最大化未完成的工作量。使用发电机和内置csv模块不会有太大帮助。您仍然会读取整个文件并解析所有内容,至少是换行符。有了这么多数据,这是不可能的。

\n\n

解析需要阅读,因此您必须首先找到解决方法。当专用模块无法提供您想要的性能时,将 CSV 格式的所有复杂性留给专用模块的最佳实践没有任何意义。必须进行一些作弊,但尽可能少。在这种情况下,我认为可以安全地假设新行的开头可以被标识为b\'\\n"AMZN",\'(坚持您的示例)。是的,这里是二进制,因为记住:还没有解析。您可以将文件扫描为二进制文件您可以从头开始从那里读取您需要的行数,以正确的方式解码和解析它们,等等。无需在那里进行优化,因为与您没有执行的数十万条不相关的行相比,100 行无需担心为.

\n\n

放弃所有解析会给你带来很多好处,但阅读也需要优化。不要先将整个文件加载到内存中,并尽可能跳过 Python 层。使用mmap可以让操作系统透明地决定将哪些内容加载到内存中,并让您可以直接使用数据。

\n\n

如果符号接近末尾,您仍然可能会读取整个文件。这是一个线性搜索,这意味着它所花费的时间与文件中的行数成线性比例。不过你可以做得更好。由于文件已排序,因此您可以改进该函数以执行一种二分搜索。将采取的步骤数(其中一个步骤是读取一行)接近于行数的二进制对数。换句话说:您可以将文件分成两个(几乎)大小相同的部分的次数。当有 100 万行时,相差五个数量级!

\n\n

这是我根据 Python 自己的想法得出的bisect_left一些措施,以考虑您的“值”跨越多个索引这一事实:

\n\n
import csv\nfrom itertools import islice\nimport mmap\n\ndef iter_symbol_lines(f, symbol):\n    # How to recognize the start of a line of interest\n    ident = b\'"\' + symbol.encode() + b\'",\'\n    # The memory-mapped file\n    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)\n    # Skip the header\n    mm.readline()\n    # The inclusive lower bound of the byte range we\'re still interested in\n    lo = mm.tell()\n    # The exclusive upper bound of the byte range we\'re still interested in\n    hi = mm.size()\n    # As long as the range isn\'t empty\n    while lo < hi:\n        # Find the position of the beginning of a line near the middle of the range\n        mid = mm.rfind(b\'\\n\', 0, (lo+hi)//2) + 1\n        # Go to that position\n        mm.seek(mid)\n        # Is it a line that comes before lines we\'re interested in?\n        if mm.readline() < ident:\n            # If so, ignore everything up to right after this line\n            lo = mm.tell()\n        else:\n            # Otherwise, ignore everything from right before this line\n            hi = mid\n    # We found where the first line of interest would be expected; go there\n    mm.seek(lo)\n    while True:\n        line = mm.readline()\n        if not line.startswith(ident):\n            break\n        yield line.decode()\n\nwith open(filename) as f:\n    r = csv.reader(islice(iter_symbol_lines(f, \'AMZN\'), 10))\n    for line in r:\n        print(line)\n
Run Code Online (Sandbox Code Playgroud)\n\n

对此代码不提供任何保证;我没有太注意边缘情况,并且无法使用您的(任何)文件进行测试,因此请将其视为概念证明。它的速度很快,但是 \xe2\x80\x93 在 SSD 上需要几十毫秒!

\n