multiprocessing.Pool.imap_unordered与固定队列大小或缓冲区?

Chr*_*isP 7 python sqlite generator python-3.4 python-multiprocessing

我正在从大型CSV文件中读取数据,对其进行处理并将其加载到SQLite数据库中.分析表明80%的时间花在I/O上,20%是处理输入以准备数据库插入.我加快了处理步骤,multiprocessing.Pool以便I/O代码永远不会等待下一条记录.但是,这导致了严重的内存问题,因为I/O步骤无法跟上工作人员的步伐.

以下玩具示例说明了我的问题:

#!/usr/bin/env python  # 3.4.3
import time
from multiprocessing import Pool

def records(num=100):
    """Simulate generator getting data from large CSV files."""
    for i in range(num):
        print('Reading record {0}'.format(i))
        time.sleep(0.05)  # getting raw data is fast
        yield i

def process(rec):
    """Simulate processing of raw text into dicts."""
    print('Processing {0}'.format(rec))
    time.sleep(0.1)  # processing takes a little time
    return rec

def writer(records):
    """Simulate saving data to SQLite database."""
    for r in records:
        time.sleep(0.3)  # writing takes the longest
        print('Wrote {0}'.format(r))

if __name__ == "__main__":
    data = records(100)
    with Pool(2) as pool:
        writer(pool.imap_unordered(process, data, chunksize=5))
Run Code Online (Sandbox Code Playgroud)

此代码导致记录积压,最终消耗所有内存,因为我无法足够快地将数据持久保存到磁盘.运行代码,您会注意到,Pool.imap_unorderedwriter第15个记录时,将消耗所有数据.现在假设处理步骤正在生成数亿行的字典,你可以看到我内存不足的原因.也许阿姆达尔的法律在行动.

有什么办法解决这个问题?我想我需要某种形式的缓冲区的Pool.imap_unordered,说:"一旦有X需要插入的记录,停止并等待,直到有小于X使更多的前".在最后一个记录被保存时,我应该能够从准备下一个记录中获得一些速度提升.

我试图使用NuMappapy模块(我修改与Python 3工作)做的正是这一点,但它是不是更快.事实上,它比顺序运行程序更糟糕; NuMap使用两个线程加上多个进程.

SQLite的批量导入功能可能不适合我的任务,因为数据需要大量处理和规范化.

我有大约85G的压缩文本要处理.我对其他数据库技术持开放态度,但选择SQLite是为了便于使用,因为这是一次写入多次读取的工作,在加载完所有内容后,只有3或4个人将使用生成的数据库.

Edm*_*ann 8

当我在解决同样的问题时,我认为防止池过载的有效方法是使用带有生成器的信号量:

from multiprocessing import Pool, Semaphore

def produce(semaphore, from_file):
    with open(from_file) as reader:
        for line in reader:
            # Reduce Semaphore by 1 or wait if 0
            semaphore.acquire()
            # Now deliver an item to the caller (pool)
            yield line

def process(item):
    result = (first_function(item),
              second_function(item),
              third_function(item))
    return result

def consume(semaphore, result):
    database_con.cur.execute("INSERT INTO ResultTable VALUES (?,?,?)", result)
    # Result is consumed, semaphore may now be increased by 1
    semaphore.release()

def main()
    global database_con
    semaphore_1 = Semaphore(1024)
    with Pool(2) as pool:
        for result in pool.imap_unordered(process, produce(semaphore_1, "workfile.txt"), chunksize=128):
            consume(semaphore_1, result)
Run Code Online (Sandbox Code Playgroud)

也可以看看:

K Hong - 多线程 - 信号量对象和线程池

Chris Terman 的演讲 - MIT 6.004 L21:信号量


unu*_*tbu 4

由于处理速度很快,但写入速度很慢,听起来你的问题是 I/O 密集型的。因此,使用多处理可能不会获得太多好处。

但是,可以剥离 的块data,处理该块,并等到该数据被写入后再剥离另一个块:

import itertools as IT
if __name__ == "__main__":
    data = records(100)
    with Pool(2) as pool:
        chunksize = ...
        for chunk in iter(lambda: list(IT.islice(data, chunksize)), []):
            writer(pool.imap_unordered(process, chunk, chunksize=5))
Run Code Online (Sandbox Code Playgroud)

  • 这似乎是最好的解决方案。这是让进程不同步和在处理步骤中获得一定速度提升之间的折衷方案。如果有一个“multiprocessing”函数可以使用某种缓冲区参数执行“imap”,那就太好了。 (2认同)