Hae*_*lle 9 python sql csv pandas
似乎从CSV加载数据比使用Pandas的SQL(Postgre SQL)更快.(我有一个SSD)
这是我的测试代码:
import pandas as pd
import numpy as np
start = time.time()
df = pd.read_csv('foo.csv')
df *= 3
duration = time.time() - start
print('{0}s'.format(duration))
engine = create_engine('postgresql://user:password@host:port/schema')
start = time.time()
df = pd.read_sql_query("select * from mytable", engine)
df *= 3
duration = time.time() - start
print('{0}s'.format(duration))
Run Code Online (Sandbox Code Playgroud)
foo.csv和数据库是相同的(两列中的数据和列数相同,4列,10万行,随机int).
CSV需要0.05秒
SQL需要0.5秒
您认为CSV比SQL快10倍是正常的吗?我想知道我在这里遗失了什么......
这是一种正常行为,读取csv文件始终是简单加载数据的最快方法之一
CSV非常天真和简单.直接从它加载将非常快.对于具有复杂结构的海量数据库,CSV不是一种选择.SQL从表中选择数据是非常快速的,并将数据返回给您.当然,如果您可以选择,修改和操作数据,它将为您的通话增加开销时间成本.
想象一下你在csv中从1920年到2017年的csv中有一个时间序列,但你只想要从2010年到今天的数据.
csv方法是加载整个csv然后选择2010年到2017年.
SQL方法是通过SQL select函数预先选择年份
在那种情况下,SQL会更快.
虽然 Steven G 对该过程的解释从根本上回答了您的问题,而 Simon G 的COPY解决方案是我能找到的最有效的解决方案,但我决定更深入地研究您的问题,并实际衡量相关的不同方面到它。
在https://github.com/mikaelhg/pandas-pg-csv-speed-poc有一个项目,其中包含各种替代解决方案的 pytest 基准测试。
此测试的 CSV 比问题中的 CSV 大一个数量级,形状为(3742616, 6). 只是为了确保各种缓冲区的大小恰到好处而使结果产生偏差的可能性较小。
感谢芬兰交通安全局 Trafi 的开放数据计划提供了测试数据。
至于 PostgreSQL 安装,它位于规范的 Docker 容器内,并以 uppedshared_buffers和work_memvalues启动,数据文件存储在主机的/dev/shm挂载点下,以抵消实际的磁盘 I/O。它的 UNIX 套接字连接点也同样暴露。
version: '3'
services:
db:
image: 'postgres:10-alpine'
command: "postgres -c 'shared_buffers=512MB' -c 'temp_buffers=80MB' -c 'work_mem=256MB'"
ports:
- '5432:5432'
volumes:
- '/dev/shm/pgtest/data:/var/lib/postgresql/data'
- '/dev/shm/pgtest/run:/var/run/postgresql'
environment:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test
test:
image: pandas_speed_poc:temp
build:
context: .
dockerfile: Dockerfile.test-runner
volumes:
- '.:/app'
- '/dev/shm/pgtest/run:/var/run/postgresql'
working_dir: '/app'
user: '1000'
Run Code Online (Sandbox Code Playgroud)
测试运行器是一个简单的 Ubuntu 18.04 容器:
FROM ubuntu:18.04
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get -qq update && \
apt-get -y -qq install python3-dev python3-pip python3-psycopg2 \
build-essential \
bash less nano wait-for-it
RUN pip3 install sqlalchemy numpy pandas \
pytest pytest-benchmark
WORKDIR /app
CMD wait-for-it db:5432 -- /bin/bash -c "trap : TERM INT; sleep infinity & wait"
Run Code Online (Sandbox Code Playgroud)
实际的基准测试是unittest为pytest-benchmark以下目的编写的 Python 3 :
#!/usr/bin/python3
from sqlalchemy import create_engine
import psycopg2
import psycopg2.extensions
import pandas as pd
import numpy as np
import io
import time
import gzip
import unittest
import pytest
DATA_FILE = 'data/licenses.csv.gz'
DROP_TABLE = "DROP TABLE IF EXISTS licenses"
CREATE_TABLE = """
CREATE TABLE licenses (
a VARCHAR(16),
b CHAR(3),
c CHAR(6),
d INTEGER,
e INTEGER,
f INTEGER
)
"""
COPY_FROM = """
COPY licenses (a, b, c, d, e, f) FROM STDIN
WITH (FORMAT CSV, DELIMITER ';', HEADER)
"""
COPY_TO = "COPY licenses TO STDOUT WITH (FORMAT CSV, HEADER)"
SELECT_FROM = 'SELECT * FROM licenses'
VACUUM = "VACUUM FULL ANALYZE"
DB_UNIX_SOCKET_URL = 'postgresql://test:test@/test'
DB_TCP_URL = 'postgresql://test:test@db/test'
def my_cursor_factory(*args, **kwargs):
cursor = psycopg2.extensions.cursor(*args, **kwargs)
cursor.itersize = 10240
return cursor
class TestImportDataSpeed(unittest.TestCase):
@pytest.fixture(autouse=True)
def setupBenchmark(self, benchmark):
self.benchmark = benchmark
@classmethod
def setUpClass(cls):
cls.engine = create_engine(DB_TCP_URL, connect_args={'cursor_factory': my_cursor_factory})
connection = cls.engine.connect().connection
cursor = connection.cursor()
cursor.execute(DROP_TABLE)
cursor.execute(CREATE_TABLE)
with gzip.open(DATA_FILE, 'rb') as f:
cursor.copy_expert(COPY_FROM, file=f, size=1048576)
connection.commit()
connection.set_session(autocommit=True)
cursor.execute(VACUUM)
cursor.close()
connection.close()
def test_pd_csv(self):
def result():
return pd.read_csv(DATA_FILE, delimiter=';', low_memory=False)
df = self.benchmark(result)
assert df.shape == (3742616, 6)
def test_psycopg2_cursor(self):
def result():
connection = self.engine.connect().connection
cursor = connection.cursor()
cursor.itersize = 102400
cursor.arraysize = 102400
cursor.execute(SELECT_FROM)
rows = cursor.fetchall()
cursor.close()
connection.close()
return pd.DataFrame(rows)
df = self.benchmark(result)
assert df.shape == (3742616, 6)
def test_pd_sqla_naive(self):
def result():
return pd.read_sql_query(SELECT_FROM, self.engine)
df = self.benchmark(result)
assert df.shape == (3742616, 6)
def test_pd_sqla_chunked(self):
def result():
gen = (x for x in pd.read_sql(SELECT_FROM, self.engine, chunksize=10240))
return pd.concat(gen, ignore_index=True)
df = self.benchmark(result)
assert df.shape == (3742616, 6)
def test_pg_copy(self):
connection = self.engine.connect().connection
cursor = connection.cursor()
def result(cursor):
f = io.StringIO()
cursor.copy_expert(COPY_TO, file=f, size=1048576)
f.seek(0)
return pd.read_csv(f, low_memory=False)
df = self.benchmark(result, cursor)
assert df.shape == (3742616, 6)
Run Code Online (Sandbox Code Playgroud)
最终结果:
speed_test.py .....
-------------------------------------------------------------------------------- benchmark: 5 tests -------------------------------------------------------------------------------
Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_pd_csv 1.4623 (1.0) 1.4903 (1.0) 1.4776 (1.0) 0.0110 (1.21) 1.4786 (1.0) 0.0171 (1.15) 2;0 0.6768 (1.0) 5 1
test_pg_copy 3.0631 (2.09) 3.0842 (2.07) 3.0732 (2.08) 0.0091 (1.0) 3.0769 (2.08) 0.0149 (1.0) 2;0 0.3254 (0.48) 5 1
test_psycopg2_cursor 4.5325 (3.10) 4.5724 (3.07) 4.5531 (3.08) 0.0161 (1.77) 4.5481 (3.08) 0.0249 (1.68) 2;0 0.2196 (0.32) 5 1
test_pd_sqla_naive 6.0177 (4.12) 6.0523 (4.06) 6.0369 (4.09) 0.0147 (1.62) 6.0332 (4.08) 0.0242 (1.63) 2;0 0.1656 (0.24) 5 1
test_pd_sqla_chunked 6.0247 (4.12) 6.1454 (4.12) 6.0889 (4.12) 0.0442 (4.86) 6.0963 (4.12) 0.0524 (3.52) 2;0 0.1642 (0.24) 5 1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)
使用 PostgreSQL 数据库时,您可以结合使用 SQL 和 CSV,以充分利用这两种方法。SQL 准确选择您需要的数据,并使用 CSV 输出快速将其加载到 pandas DataFrame 中。
conn = psycopg2.connect(**conn_params)
with conn.cursor() as cur:
sql = 'SELECT * FROM large_table'
buf = io.StringIO()
cur.copy_expert(f'COPY ({sql}) TO STDOUT WITH CSV HEADER', buf)
buf.seek(0)
df = pd.read_csv(buf, header=0, low_memory=False,
true_values='t', false_values='f')
conn.close()
Run Code Online (Sandbox Code Playgroud)
它使用 PostgreSQL 的快速 COPY 命令结合 psycopg2 的copy_expert()函数将查询结果以 CSV 格式读取到字符串缓冲区中。然后您可以在该字符串缓冲区上使用 pandas read_csv()。
缺点是您可能必须随后转换数据类型(例如时间戳将是字符串)。该read_csv()函数有一些参数可以帮助解决这个问题(例如,,,,parse_dates... )。true_valuesfalse_values
在我的用例(3000 万行,15 列)中,与 pandas 函数相比,这使我的性能提高了约 2-3 倍read_sql()。
| 归档时间: |
|
| 查看次数: |
4343 次 |
| 最近记录: |