luk*_*kik 5 python sqlalchemy multiprocessing python-3.x
我的普通脚本在 20 秒内处理了大约 30,000 条记录。鉴于我必须处理的数据量(超过 5000 万条记录),我认为使用 python 的多处理是明智的。
在我的过程结束时,我使用 sqlalchemy 核心进行了数据库更新,其中我以 50,000 的批次更新处理过的记录。SQLAlchemy Core 要求您向它传递一个列表,以便它进行批量更新甚至插入。我会调用这个列表py_list
对于 Python 的多处理,我通过 a 捕获进程的结果multiprocessing.manager.list(),我将调用mp_list.
一切正常,直到我将 传递mp_list给 SQLAlchemy 批量更新语句。这失败并出现错误AttributeError: 'list' object has no attribute 'keys'。谷歌搜索给我带来了一个关于 SO的问题,它指出 multiprocessing.manager.list() 甚至 multiprocessing.manager.dict() 不是真正的 python 列表/字典。
那么问题是,如何将 multiprocessing.manager.list 转换为真正的 python 列表。
mp_list 填充如下:
import multiprocessing
manager = multiprocessing.Manager()
mp_list = manager.list()
def populate_mp_list(pid, is_processed):
'''Mark the record as having been processed'''
dict = {}
dict['b_id'] = pid
dict['is_processed'] = is_processed
mp_list.append(dict)
Run Code Online (Sandbox Code Playgroud)
抛出错误的SQLALchemy代码如下:
CONN = Engine.connect()
trans = CONN.begin()
stmt = mytable.update().where(mytable.c.id == bindparam('b_id')).\
values(is_processed=bindparam('is_processed'))
CONN.execute(stmt, mp_list)
trans.commit(
Run Code Online (Sandbox Code Playgroud)
我尝试将 mp_list 转换为真正的 python 列表。创建的新列表有效,但其创建的时间损失抵消了多处理中节省的所有时间。
如果我循环返回mp_list并创建一个新列表。
y = []
for x in mp_list:
y.append(x)
Run Code Online (Sandbox Code Playgroud)
此外,如果我对 进行“复制”,则mp_list每个复制都会增加 3 秒!平均罚款,这不酷。
y = mp_list[0:len(mp_list)]
Run Code Online (Sandbox Code Playgroud)
那么,将 multiprocessing.manager.list 转换为 SQLAlchemy Core 可用的列表的最快方法是什么?
希望我没有迟到。
这行不通吗?
pythonlist = list(mp_list)
Run Code Online (Sandbox Code Playgroud)
同样的事情也适用于 dict:-
pythondict = dict(mp_dict)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5650 次 |
| 最近记录: |