python multiprocessing:写入同一个excel文件

Sup*_*ale 5 python queue locking multiprocessing

我是Python新手,我试图将五个不同进程的结果保存到一个excel文件中(每个进程写入不同的工作表)。我在这里阅读了不同的帖子,但仍然无法完成它,因为我对 pool.map、队列和锁非常困惑,而且我不确定这里需要什么来完成这项任务。到目前为止,这是我的代码:

list_of_days = ["2017.03.20", "2017.03.21", "2017.03.22", "2017.03.23", "2017.03.24"]
results = pd.DataFrame()

if __name__ == '__main__':
    global list_of_days
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    nr_of_cores = multiprocessing.cpu_count()
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(processes=nr_of_cores, initializer=init, initargs=(l,))
    pool.map(f, range(len(list_of_days)))
    pool.close()
    pool.join()

def init(l):
    global lock
    lock = l

def f(k):
    global results

    *** DO SOME STUFF HERE***

    results = results[ *** finished pandas dataframe *** ]

    lock.acquire()
    results.to_excel(writer, sheet_name=list_of_days[k])
    writer.save()
    lock.release()
Run Code Online (Sandbox Code Playgroud)

结果是在 Excel 中只创建了一张工作表(我认为这是最后完成的过程)。关于这段代码的一些问题:

  • 如何避免定义全局变量?
  • 是否可以传递数据帧?
  • 我应该将锁定移至 main 吗?

非常感谢这里的一些意见,因为我认为掌握多处理是有帮助的。谢谢

sto*_*vfl 3

1)为什么你在第二个方法中的几个地方实现了 time.sleep ?

__main__, time.sleep(0.1), 中给process已启动的启动时间片。
在 中f2(fq, q),给出queue一个时间片来将所有缓冲的数据刷新到管道并q.get_nowait()使用。
在 中w(q),仅用于模拟长期运行的测试writer.to_excel(...),我删除了这一点。

2) pool.map 和 pool = [mp.Process( . )] 有什么区别?

使用pool.map不需要Queue,没有参数传递,更短的代码。必须worker_process立即返回result并终止。 pool.map只要所有的事情都iteration完成了,就开始一个新的过程。results之后必须 对其进行处理。

使用pool = [mp.Process( . )], 开始n processes。Aprocess终止于queue.Empty

您能想到一种情况,您会更喜欢其中一种方法而不是另一种方法吗?

方法1:快速设置,序列化,只对结果感兴趣才能继续。
方法2:如果你想并行执行所有工作负载


不能在流程中使用global writer
writer实例必须属于一个process

的用法 mp.Pool,例如:

def f1(k):
  # *** DO SOME STUFF HERE***
  results = pd.DataFrame(df_)
  return results

if __name__ == '__main__':
    pool = mp.Pool()
    results = pool.map(f1, range(len(list_of_days)))

    writer = pd.ExcelWriter('../test/myfile.xlsx', engine='xlsxwriter')
    for k, result in enumerate(results):
        result.to_excel(writer, sheet_name=list_of_days[k])

    writer.save()
    pool.close()
Run Code Online (Sandbox Code Playgroud)

这就导致了流程.to_excel(...)中被顺序调用__main__


如果你想要并行,.to_excel(...)你必须使用mp.Queue().
例如:

流程worker

# mp.Queue exeptions have to load from
try:
    # Python3
    import queue
except:
    # Python 2
    import Queue as queue

def f2(fq, q):
    while True:
        try:
            k = fq.get_nowait()
        except queue.Empty:
            exit(0)

        # *** DO SOME STUFF HERE***

        results = pd.DataFrame(df_)
        q.put( (list_of_days[k], results) )
        time.sleep(0.1)  
Run Code Online (Sandbox Code Playgroud)

流程writer

def w(q):
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    while True:
        try:
            titel, result = q.get()
        except ValueError:
            writer.save()
            exit(0)

        result.to_excel(writer, sheet_name=titel)
Run Code Online (Sandbox Code Playgroud)

流程__main__

if __name__ == '__main__':
    w_q = mp.Queue()
    w_p = mp.Process(target=w, args=(w_q,))
    w_p.start()
    time.sleep(0.1)

    f_q = mp.Queue()
    for i in range(len(list_of_days)):
        f_q.put(i)

    pool = [mp.Process(target=f2, args=(f_q, w_q,)) for p in range(os.cpu_count())]
    for p in pool:
        p.start()
        time.sleep(0.1)

    for p in pool:
        p.join()

    w_q.put('STOP')
    w_p.join()
Run Code Online (Sandbox Code Playgroud)

使用 Python 进行测试:3.4.2 - pandas:0.19.2 - xlsxwriter:0.9.6