大(600 万行)pandas df 当 chunksize =100 时会导致内存错误,使用 `to_sql`,但可以轻松保存 100,000 的文件而没有 chunksize

San*_*ta7 5 python sql pandas

我在 Pandas 中创建了一个大型数据库,大约有 600 万行文本数据。我想将其保存为 SQL 数据库文件,但是当我尝试保存它时,出现内存不足的 RAM 错误。我什至将卡盘尺寸减小到 100,但它仍然崩溃。

但是,如果我只有具有 100,000 行的该数据帧的较小版本,并将其保存到未指定chucksize 的数据库中,则保存数据帧没有问题。

这是我的代码

from sqlalchemy import create_engine
engine = sqlalchemy.create_engine("sqlite:///databasefile.db")
dataframe.to_sql("CS_table", engine, chunksize = 100)
Run Code Online (Sandbox Code Playgroud)

我的理解是,由于它一次仅处理 100 行,因此 RAM 使用量应反映保存 100 行的情况。幕后还有其他事情发生吗?也许多线程?

在运行此代码之前,我使用的是 4.8 GB RAM,而 Google Colab 中提供了 12.8 GB RAM。运行上述代码会耗尽所有 RAM,直到环境崩溃。

我希望能够将我的 Pandas 数据帧保存到 SQL 文件中,而我的环境不会崩溃。我所在的环境是 Google Colab。Pandas 数据名是 2 列,约 600 万行。每个单元格包含大约这么多文本:

在八个 GPU 上训练 3.5 天后,我们的模型建立了一个新的单模型最先进的 BLEU 分数 41.8,这是文献中最佳模型训练成本的一小部分。我们表明,通过将 Transformer 成功应用于具有大量和有限训练数据的英语选区解析,可以很好地推广到其他任务。”

编辑:

我在不同阶段做了键盘中断。这是在RAM中第一次跳转后键盘中断的结果

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-22-51b6e444f80d> in <module>()
----> 1 dfAllT.to_sql("CS_table23", engine, chunksize = 100)

12 frames
/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2529         sql.to_sql(self, name, con, schema=schema, if_exists=if_exists,
   2530                    index=index, index_label=index_label, chunksize=chunksize,
-> 2531                    dtype=dtype, method=method)
   2532 
   2533     def to_pickle(self, path, compression='infer',

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    458     pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
    459                       index_label=index_label, schema=schema,
--> 460                       chunksize=chunksize, dtype=dtype, method=method)
    461 
    462 

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1172                          schema=schema, dtype=dtype)
   1173         table.create()
-> 1174         table.insert(chunksize, method=method)
   1175         if (not name.isdigit() and not name.islower()):
   1176             # check for potentially case sensitivity issues (GH7815)

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in insert(self, chunksize, method)
    684 
    685                 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
--> 686                 exec_insert(conn, keys, chunk_iter)
    687 
    688     def _query_iterator(self, result, chunksize, columns, coerce_float=True,

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in _execute_insert(self, conn, keys, data_iter)
    597         """
    598         data = [dict(zip(keys, row)) for row in data_iter]
--> 599         conn.execute(self.table.insert(), data)
    600 
    601     def _execute_insert_multi(self, conn, keys, data_iter):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
    986             raise exc.ObjectNotExecutableError(object_)
    987         else:
--> 988             return meth(self, multiparams, params)
    989 
    990     def _execute_function(self, func, multiparams, params):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py in _execute_on_connection(self, connection, multiparams, params)
    285     def _execute_on_connection(self, connection, multiparams, params):
    286         if self.supports_execution:
--> 287             return connection._execute_clauseelement(self, multiparams, params)
    288         else:
    289             raise exc.ObjectNotExecutableError(self)

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_clauseelement(self, elem, multiparams, params)
   1105             distilled_params,
   1106             compiled_sql,
-> 1107             distilled_params,
   1108         )
   1109         if self._has_events or self.engine._has_events:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1246         except BaseException as e:
   1247             self._handle_dbapi_exception(
-> 1248                 e, statement, parameters, cursor, context
   1249             )
   1250 

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1466                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1467             else:
-> 1468                 util.reraise(*exc_info)
   1469 
   1470         finally:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    127         if value.__traceback__ is not tb:
    128             raise value.with_traceback(tb)
--> 129         raise value
    130 
    131     def u(s):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1222                 if not evt_handled:
   1223                     self.dialect.do_executemany(
-> 1224                         cursor, statement, parameters, context
   1225                     )
   1226             elif not parameters and context.no_parameters:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py in do_executemany(self, cursor, statement, parameters, context)
    545 
    546     def do_executemany(self, cursor, statement, parameters, context=None):
--> 547         cursor.executemany(statement, parameters)
    548 
    549     def do_execute(self, cursor, statement, parameters, context=None):

KeyboardInterrupt: 
Run Code Online (Sandbox Code Playgroud)

这是如果我在崩溃之前执行键盘中断的结果

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-24-68b60fe221fe>", line 1, in <module>
    dfAllT.to_sql("CS_table22", engine, chunksize = 100)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
    dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
    chunksize=chunksize, dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
    table.insert(chunksize, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
    exec_insert(conn, keys, chunk_iter)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 599, in _execute_insert
    conn.execute(self.table.insert(), data)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1468, in _handle_dbapi_exception
    util.reraise(*exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 129, in reraise
    raise value
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1224, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 547, in do_executemany
    cursor.executemany(statement, parameters)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/usr/lib/python3.6/inspect.py", line 739, in getmodule
    f = getabsfile(module)
  File "/usr/lib/python3.6/inspect.py", line 708, in getabsfile
    _filename = getsourcefile(object) or getfile(object)
  File "/usr/lib/python3.6/inspect.py", line 693, in getsourcefile
    if os.path.exists(filename):
  File "/usr/lib/python3.6/genericpath.py", line 19, in exists
    os.stat(path)
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

我在它崩溃之前又跑了一次,这似乎给出了另一个不同的结果

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-28-f18004debe33>", line 1, in <module>
    dfAllT.to_sql("CS_table25", engine, chunksize = 100)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
    dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
    chunksize=chunksize, dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
    table.insert(chunksize, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
    exec_insert(conn, keys, chunk_iter)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in _execute_insert
    data = [dict(zip(keys, row)) for row in data_iter]
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in <listcomp>
    data = [dict(zip(keys, row)) for row in data_iter]
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/usr/lib/python3.6/inspect.py", line 742, in getmodule
    os.path.realpath(f)] = module.__name__
  File "/usr/lib/python3.6/posixpath.py", line 388, in realpath
    path, ok = _joinrealpath(filename[:0], filename, {})
  File "/usr/lib/python3.6/posixpath.py", line 421, in _joinrealpath
    newpath = join(path, name)
KeyboardInterrupt
---------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)

我尝试过的其他事情:

使用 dropna 删除所有 none/nan 值

dfAllT = dfAllT.applymap(str) 确保我的所有值都是字符串

dfAllT.reset_index(drop=True, inplace=True) 以确保索引没有不对齐。

编辑:

就像评论中提到的那样,我现在尝试在循环中使用 to_sql 。

for i in range(586147):
    print(i)
    dfAllT.iloc[i*10000:(i+1)*10000].to_sql('CS_table', engine, if_exists= 'append')
Run Code Online (Sandbox Code Playgroud)

此操作最终会占用我的 RAM,并最终导致大约进行到一半时崩溃。我想知道这是否表明 sqlite 将所有内容都保存在内存中,以及是否有解决方法。

编辑:

我尝试了更多的东西,更短的夹头,在每一步后处理引擎并创建一个新的。最终还是吃光了所有的内存并崩溃了。

for i in range(586147):
    print(i)
    engine = sqlalchemy.create_engine("sqlite:///CSTitlesSummariesData.db")
    dfAllT.iloc[i*10:(i+1)*10].to_sql('CS_table', engine, index = False, if_exists= 'append')
    engine.dispose() 
    gc.collect 
Run Code Online (Sandbox Code Playgroud)

我的想法:

所以看起来整个数据库不知何故以某种方式保存在活动内存中。

制作它的熊猫数据帧是 5 个演出(或者至少在我尝试将其转换为 sqlite 之前,这是多少 RAM)。我的系统在大约 12.72 演出时崩溃。我想sqlite 数据库比pandas 数据帧占用更少的RAM。

小智 5

我已经使用df.to_sql一年了,现在我正在努力应对我运行大量资源但它不起作用的事实。我意识到chucksize会使你的内存过载,pandas加载到内存中,然后按块发送它。我必须直接使用sql来控制。(这是我找到解决方案的地方 - > https://github.com/pandas-dev/pandas/issues/12265我真的鼓励你读到最后。)

如果您需要从数据库中读取数据而不会使内存超载,请检查这段代码:

def get_data_by_chunks(cls, table, chunksize: int) -> iter:
    with MysqlClient.get_engine().begin() as conn:
        query_count = "select COUNT(*) from my_query"
        row_count = conn.execute(query_count, where).fetchone()[0]

        for i in range(math.ceil(row_count / chunksize)):
            query = """
               SELECT * FROM my_table
               WHERE my_filiters
               LIMIT {offset}, {row_count};
             """
            yield pd.read_sql(query, conn)

for df in get_data_by_chunks(cls, table, chunksize: int):
    print(df.shape)
Run Code Online (Sandbox Code Playgroud)

  • 这个答案对我很有帮助,来自附加的链接connection.execution_options(stream_results=True)。为我解决了。 (2认同)

And*_*den 3

从单步执行代码来看,我认为这一行,它创建了一堆数据帧:

chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
Run Code Online (Sandbox Code Playgroud)

看起来这可能是一个错误。具体来说,这是在数据库插入之前进行的准备工作。

您可以做的一个技巧是在内存快速增加时按 CTRL-C,然后查看哪一行暂停(我打赌是这一行)。

用户编辑:

问题通过使用解决了

explicit loop (rather than using chunk), ie. for i in range(100): df.iloc[i * 100000:(i+1):100000].to_sql(...)

这仍然会导致内存错误,但允许用户在崩溃之前继续循环停止的地方。

更可靠的解决方案是“也许尝试原始连接,而不是使用 SQLEngine?\”但是用户没有机会尝试这个