dan*_*els 7 python sqlite multithreading
我正在开发一个应用程序,它将通过HTTP从几个地方收集数据,在本地缓存数据,然后通过HTTP提供服务.
所以我看着以下内容.我的应用程序将首先创建多个线程,这些线程将以指定的时间间隔收集数据,并将该数据本地缓存到SQLite数据库中.
然后在主线程中启动一个CherryPy应用程序,它将查询该SQLite数据库并提供数据.
我的问题是:如何从我的线程和CherryPy应用程序处理与SQLite数据库的连接?
如果我每个线程连接到数据库,我还能够创建/使用内存数据库吗?
简短回答:不要在线程应用程序中使用Sqlite3.
Sqlite3数据库在大小方面可以很好地扩展,但对于并发而言非常可靠.您将受到"数据库已锁定"错误的困扰.
如果这样做,您将需要每个线程连接,并且您必须确保这些连接自行清理.传统上这是使用线程本地会话来处理的,并且使用SQLAlchemy的ScopedSession执行得相当好(例如).如果我是你,我会使用它,即使你没有使用SQLAlchemy ORM功能.
进行此测试是为了确定从 SQLite 数据库写入和读取的最佳方式。我们遵循以下 3 种方法
我们的示例数据集是一个虚拟生成的 OHLC 数据集,其中包含符号、时间戳以及 ohlc 和 volumefrom、volumeto 的 6 个假值
读
获胜者:处理和正常
写
获胜者:正常
注意:所有记录都不是使用线程和处理写入方法写入的。线程和处理的写入方法显然会遇到数据库锁定错误,因为写入会排队 SQlite 仅将写入排队到某个阈值,然后抛出 sqlite3.OperationalError 指示数据库已锁定。理想的方法是再次重试插入相同的块,但是没有意义,因为即使不重试锁定/失败的插入,并行插入的方法执行也比顺序读取需要更多的时间。如果不重试,97%的行已写入并且仍然存在比顺序写入多花费 10 倍的时间
外卖策略:
更喜欢在同一个线程中读取 SQLite 并编写它
如果必须执行多线程,请使用多处理来读取,其性能或多或少相同,并推迟到单线程写入操作
不要使用线程进行读取和写入,因为这两者都会慢 10 倍,你可以为此感谢 GIL
这是完整测试的代码
import sqlite3
import time
import random
import string
import os
import timeit
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import os
database_file = os.path.realpath('../files/ohlc.db')
create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))'
insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)'
select = 'SELECT * from database_threading_test'
def time_stuff(some_function):
def wrapper(*args, **kwargs):
t0 = timeit.default_timer()
value = some_function(*args, **kwargs)
print(timeit.default_timer() - t0, 'seconds')
return value
return wrapper
def generate_values(count=100):
end = int(time.time()) - int(time.time()) % 900
symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
ts = list(range(end - count * 900, end, 900))
for i in range(count):
yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5)
def generate_values_list(symbols=1000,count=100):
values = []
for _ in range(symbols):
values.extend(generate_values(count))
return values
@time_stuff
def sqlite_normal_read():
"""
100k records in the database, 1000 symbols, 100 rows
First run
0.25139795300037804 seconds
Second run
Third run
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
results = conn.execute(select).fetchall()
print(len(results))
except sqlite3.OperationalError as e:
print(e)
@time_stuff
def sqlite_normal_write():
"""
1000 symbols, 100 rows
First run
2.279409104000024 seconds
Second run
2.3364172020001206 seconds
Third run
"""
l = generate_values_list()
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
conn.executemany(insert_statement, l)
except sqlite3.OperationalError as e:
print(e)
@time_stuff
def sequential_batch_read():
"""
We read all the rows for each symbol one after the other in sequence
First run
3.661222331999852 seconds
Second run
2.2836898810001003 seconds
Third run
0.24514851899994028 seconds
Fourth run
0.24082150699996419 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall()
for symbol in symbols:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
def sqlite_threaded_read_task(symbol):
results = []
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
finally:
return results
def sqlite_multiprocessed_read_task(symbol):
results = []
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
finally:
return results
@time_stuff
def sqlite_threaded_read():
"""
1000 symbols, 100 rows per symbol
First run
9.429676861000189 seconds
Second run
10.18928106400017 seconds
Third run
10.382290903000467 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
with ThreadPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_threaded_read_task, symbols, chunksize=50)
for result in results:
pass
@time_stuff
def sqlite_multiprocessed_read():
"""
1000 symbols, 100 rows
First run
0.2484774920012569 seconds!!!
Second run
0.24322178500005975 seconds
Third run
0.2863524549993599 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
with ProcessPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50)
for result in results:
pass
def sqlite_threaded_write_task(n):
"""
We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
data = list(generate_values())
try:
with conn:
conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
except sqlite3.OperationalError as e:
print("Database locked",e)
finally:
conn.close()
return len(data)
def sqlite_multiprocessed_write_task(n):
"""
We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
data = list(generate_values())
try:
with conn:
conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
except sqlite3.OperationalError as e:
print("Database locked",e)
finally:
conn.close()
return len(data)
@time_stuff
def sqlite_threaded_write():
"""
Did not write all the results but the outcome with 97400 rows written is still this...
Takes 20x the amount of time as a normal write
1000 symbols, 100 rows
First run
28.17819765000013 seconds
Second run
25.557972323000058 seconds
Third run
"""
symbols = [i for i in range(1000)]
with ThreadPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_threaded_write_task, symbols, chunksize=50)
for result in results:
pass
@time_stuff
def sqlite_multiprocessed_write():
"""
1000 symbols, 100 rows
First run
30.09209805699993 seconds
Second run
27.502465319000066 seconds
Third run
"""
symbols = [i for i in range(1000)]
with ProcessPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50)
for result in results:
pass
sqlite_normal_write()
Run Code Online (Sandbox Code Playgroud)