我有一个文件,该文件包含超过400万行,我希望逐行读取它们,并对第一个“列”数据执行操作。可能有重复的数据,我想确保只执行一次这些操作,并且能够重新启动程序并从上次中断的地方开始。
我的解决方案是解析该行,将值存储到列表中,然后“如果x不在列表中”执行操作,然后在程序死/停止/被杀死时将该列表存储为泡菜。
但是,脚本运行了几个小时后会出现段错误,我认为这是因为由于在checkedFile列表中存储了大量MD5和,因此我已经烧光了所有RAM。
因为文件太大(超过400万个条目),所以我需要快速查找的东西,希望有一个简单的解决方案。谢谢!
数据如下所示:6e5f6c90e6bf31c31283afb8dca93da8 | mouse.gif | 10102017
我当前的代码如下所示:
checkedFile = []
def readline(inFile):
print("Opening file: %s\n" % inFile)
try:
with open(inFile, "r") as inputFile:
for line in inputFile: # read in file line by line
fileHash, garbage = line.split("|",1)
if fileHash not in checkedFile:
checkedFile.append(fileHash)
checkit(fileHash) #
Run Code Online (Sandbox Code Playgroud)
我想您遇到的问题之一是您选择的数据结构。由于checkedFile是一个列表,因此检查哈希是否在列表中具有O(N) 时间复杂度。我的第一个建议是使用 python set。这使用了性能为 的哈希映射O(1)。
checkedFile = set()
def readline(inFile):
print("Opening file: %s\n" % inFile)
with open(inFile, "r") as inputFile:
for line in inputFile: # read in file line by line
fileHash, garbage = line.split("|",1)
if fileHash not in checkedFile:
checkedFile.add(fileHash)
checkit(fileHash)
Run Code Online (Sandbox Code Playgroud)
Python 真正出现段错误的原因有两个,所以你肯定会内存不足。我觉得很奇怪,你在列表存储中出现了段错误,因为每个散列都是 16 字节 * 400 万 = 64Mb,我认为这与典型计算机中的内存量相差甚远。
走持久存储路线我建议使用 sqlite。这种方法不会耗尽内存,因为它将存储在您的硬盘上。
import sqlite3
connection = sqlite3.connect('cache.db')
CREATE_CACHE_TABLE = '''CREATE TABLE IF NOT EXISTS cache (
hash TEXT PRIMARY KEY
)'''
SELECT_STATEMENT = 'SELECT hash FROM cache WHERE hash = ?'
INSERT_STATEMENT = 'INSERT INTO cache VALUES ?'
with connection:
connection.execute(CREATE_CACHE_TABLE)
with open(inFile, "r") as inputFile:
for line in inputFile:
fileHash, garbage = line.split("|", 1)
with connection:
if not connection.execute(SELECT_STATMENT, (fileHash,)).fetchone():
connection.execute(INSERT_STATEMENT, (fileHash,))
checkit(fileHash)
Run Code Online (Sandbox Code Playgroud)
作为额外的好处,您还可以缓存结果,checkit以便在计算因某种原因停止时可以恢复计算。这也对您有利,因为网络请求的最短响应时间为10ms。您提到 Web 请求返回 json 响应。我之前写过sqlite with python how to store json in sqlite。您需要修改该函数checkit以返回 json 数据。
import sqlite3
import json
def adapt_json(data):
return (json.dumps(data, sort_keys=True)).encode()
def convert_json(blob):
return json.loads(blob.decode())
sqlite3.register_adapter(dict, adapt_json)
sqlite3.register_converter('JSON', convert_json)
connection = sqlite3.connect('cache.db', detect_types=sqlite3.PARSE_DECLTYPES)
CREATE_CACHE_TABLE = '''CREATE TABLE IF NOT EXISTS cache (
hash TEXT PRIMARY KEY,
check_results JSON
)'''
SELECT_STATEMENT = 'SELECT hash FROM cache WHERE hash = ?'
INSERT_STATEMENT = 'INSERT INTO cache VALUES ?, ?'
with connection:
connection.execute(CREATE_CACHE_TABLE)
with open(inFile, "r") as inputFile:
for line in inputFile:
fileHash, garbage = line.split("|", 1)
with connection:
if not connection.execute(SELECT_STATMENT, (fileHash,)).fetchone():
json_data = checkit(fileHash)
connection.execute(INSERT_STATEMENT, (fileHash, json_data))
Run Code Online (Sandbox Code Playgroud)
如果您使用最后一种方法。您可以与文件的不同块并行运行该程序,它仍然可以工作。此外,如果您最终得到更大的文件,这种方法应该可以很好地扩展。
| 归档时间: |
|
| 查看次数: |
2441 次 |
| 最近记录: |