Shu*_*Das 1 python postgresql psycopg2 cursor bigdata
我有一个巨大的表(约 8 亿),我需要根据某些段条件获取数据。
数据:
d_id month_id sec average Class
89 201701 S 5.98 A
73 201703 N 7.63 B
31 201708 F 6.38 P
11 201709 K 6.38 P
Run Code Online (Sandbox Code Playgroud)
我有两个清单:
monthList = [201701,201702,201703]
Run Code Online (Sandbox Code Playgroud)
所以sql查询是:
sql_query = str("""select * from dbo.table_name where month_id IN monthList;""")
Run Code Online (Sandbox Code Playgroud)
现在我想将这些数据保存在服务器端游标中,并从中获取基于 classList 的子集
curs = cnxn.cursor('Class')
classList = ['A','B','P']
while True:
records = curs.fetchmany(int(1e3))
if not records:
break
for record in records:
# here I want to use the classList to subset the data , something like
df = pd.DataFrame()
df.append(curs.fetchmany([cursor['Class'] == classList]))
# And once all the records of each Class has been read create csv
df.to_csv("ClassList.csv")
Run Code Online (Sandbox Code Playgroud)
因此,对于上面给出的数据: 将生成 3 个 csv: 1. ClassA.csv
d_id month_id sec average Class
31 201708 F 6.38 P
11 201709 K 6.38 P
Run Code Online (Sandbox Code Playgroud)
所有数据都在 PostgreSQL 中,我使用 psycopg2 调用它
有人可以帮助我解决以下问题吗:1.这是否可以使用服务器端游标来完成。2.我基本上需要根据列表中给出的month_id从所有数据创建每个类的组合csv。
这并不完全是服务器端游标的工作方式 - 它们使服务器在客户端遍历结果集时保持状态,批量获取,可能会反转遍历。好处是服务器维护有关连接的状态,以便客户端可以更有效地分配内存(默认情况下,客户端会在允许代码迭代之前尝试获取所有内容。对于 80 亿行,这可能会导致问题)。
但要记住的关键是,游标返回的数据是由查询决定的——你可以比较每一行的结果来决定做什么,但你仍然是逐行操作,而不是改变服务器返回的结果。但是...如果您滥用服务器,您的 DBA 可能会怀着暴力的意图来追捕您...在进行多次遍历时持有服务器端游标 80 亿行会给数据库带来很大的内存压力,从而减慢速度其他用户。
通过 Pandas 的本地系统内存也是如此 - 根据您的示例,除了使用它生成 CSV 之外,您实际上没有做任何其他事情。
该怎么办?
如果您只需要编写一个大型的组合 CSV,则可以使用 psycopg2 的本机copy_expert功能直接流式传输到 CSV,并结合服务器端游标。
我经常使用这种方法从大型数据集创建 CSV - 同时保持数据库和客户端内存平坦。它也比我直接用 Python 编写的任何逐行 CSV 生成都要快。
最后,不清楚您需要 1 个 CSV 还是 3 个 CSV。您的最终评论引用了“组合 CSV”,因此要在 Michael 的评论的基础上做到这一点,请尝试如下操作:
sql = '''
copy (
select *
from your_table_name
where month_id in (. . .)
and Class in (. . .)
)
to stdout
with (
format csv, header
)'''
stmt = db.cursor('my_cursor')
with open('output.csv', 'w') as outfile:
stmt.copy_expert(sql, outfile)
Run Code Online (Sandbox Code Playgroud)
如果您确实需要 3 个单独的 CSV,则可以修改方法以执行三个单独的传递。
希望有帮助。
看