在python中使用服务器端游标和条件读取海量数据

Shu*_*Das 1 python postgresql psycopg2 cursor bigdata

我有一个巨大的表(约 8 亿),我需要根据某些段条件获取数据。

数据:

d_id    month_id    sec     average Class
89      201701      S       5.98    A
73      201703      N       7.63    B
31      201708      F       6.38    P
11      201709      K       6.38    P
Run Code Online (Sandbox Code Playgroud)

我有两个清单:

monthList = [201701,201702,201703]
Run Code Online (Sandbox Code Playgroud)

所以sql查询是:

sql_query = str("""select * from dbo.table_name where month_id IN monthList;""") 
Run Code Online (Sandbox Code Playgroud)

现在我想将这些数据保存在服务器端游标中,并从中获取基于 classList 的子集

curs = cnxn.cursor('Class')
classList = ['A','B','P']

while True:
    records = curs.fetchmany(int(1e3))
    if not records:
      break
    for record in records:
      # here I want to use the classList to subset the data , something like 
      df = pd.DataFrame()
      df.append(curs.fetchmany([cursor['Class'] == classList]))

      # And once all the records of each Class has been read create csv
      df.to_csv("ClassList.csv")
Run Code Online (Sandbox Code Playgroud)

因此,对于上面给出的数据: 将生成 3 个 csv: 1. ClassA.csv

d_id    month_id    sec     average Class
31      201708      F       6.38    P
11      201709      K       6.38    P
Run Code Online (Sandbox Code Playgroud)

所有数据都在 PostgreSQL 中,我使用 psycopg2 调用它

有人可以帮助我解决以下问题吗:1.这是否可以使用服务器端游标来完成。2.我基本上需要根据列表中给出的month_id从所有数据创建每个类的组合csv。

bim*_*api 5

这并不完全是服务器端游标的工作方式 - 它们使服务器在客户端遍历结果集时保持状态,批量获取,可能会反转遍历。好处是服务器维护有关连接的状态,以便客户端可以更有效地分配内存(默认情况下,客户端会在允许代码迭代之前尝试获取所有内容。对于 80 亿行,这可能会导致问题)。

但要记住的关键是,游标返回的数据是由查询决定的——你可以比较每一行的结果来决定做什么,但你仍然是逐行操作,而不是改变服务器返回的结果。但是...如果您滥用服务器,您的 DBA 可能会怀着暴力的意图来追捕您...在进行多次遍历时持有服务器端游标 80 亿行会给数据库带来很大的内存压力,从而减慢速度其他用户。

通过 Pandas 的本地系统内存也是如此 - 根据您的示例,除了使用它生成 CSV 之外,您实际上没有做任何其他事情。

该怎么办?

如果您只需要编写一个大型的组合 CSV,则可以使用 psycopg2 的本机copy_expert功能直接流式传输到 CSV,并结合服务器端游标

我经常使用这种方法从大型数据集创建 CSV - 同时保持数据库和客户端内存平坦。它也比我直接用 Python 编写的任何逐行 CSV 生成都要快。

最后,不清楚您需要 1 个 CSV 还是 3 个 CSV。您的最终评论引用了“组合 CSV”,因此要在 Michael 的评论的基础上做到这一点,请尝试如下操作:

sql = '''
copy (
    select * 
      from your_table_name 
     where month_id in (. . .) 
       and Class in (. . .)
)
to stdout
with (
  format csv, header
)'''

stmt = db.cursor('my_cursor')
with open('output.csv', 'w') as outfile:
    stmt.copy_expert(sql, outfile)
Run Code Online (Sandbox Code Playgroud)

如果您确实需要 3 个单独的 CSV,则可以修改方法以执行三个单独的传递。

希望有帮助。