Python 多处理调度

Zhu*_*arb 3 queue scheduled-tasks multiprocessing python-3.x

在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行(2+)个进程,我创建了一个最小的代表性示例,如下所示。

我的问题是:

1)我的理解是,由于我有不同的功能,我不能使用Pool.map_async()及其变体。是对的吗?我见过的唯一例子是重复相同的功能,就像这个答案一样。

2)使此设置永久运行的最佳实践是什么?在下面的代码中,我使用了一个while循环,我怀疑它不适合此目的。

3)Process我使用和 的方式是Manager最佳的吗?我使用multiprocessing.Manager.dict()共享字典来返回进程的结果。我在这个答案的评论中看到,使用Queuehere是有意义的,但是该Queue对象没有“.dict()”方法。所以,我不确定这会如何运作。

对于示例代码的任何改进和建议,我将不胜感激。

import numpy as np
import pandas as pd
import multiprocessing
import time

def worker1(name, t , seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    time.sleep(t)
    np.random.seed(seed)
    df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

def worker2(name, t, seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    np.random.seed(seed)
    time.sleep(t)
    df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

if __name__ == '__main__':
    t=1
    while True:

        start_time = time.time()
        manager = multiprocessing.Manager()
        parallel_dict = manager.dict()
        seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
        jobs = []
        p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
        p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
        jobs.append(p1)
        jobs.append(p2)
        p1.start()
        p2.start()
        for proc in jobs:
            proc.join()
        parallel_end_time = time.time() - start_time
        #print(parallel_dict)
        df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
        df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
        merged_df = pd.concat([df1,df2], axis=0)
        print(merged_df)
Run Code Online (Sandbox Code Playgroud)

Tom*_*ana 6

答案1(多个功能的映射)

从技术上来说你是对的。对于 map、map_async 和其他变体,您应该使用单个函数。

但是可以通过实现执行器并将要执行的函数作为参数的一部分传递来绕过此约束:

def dispatcher(args):
    return args[0](*args[1:])
Run Code Online (Sandbox Code Playgroud)

所以一个最小的工作示例:

import multiprocessing as mp

def function_1(v):
    print("hi %s"%v)
    return 1
    
def function_2(v):
    print("by %s"%v)
    return 2

def dispatcher(args):
    return args[0](*args[1:])

with mp.Pool(2) as p:
    tasks = [
        (function_1, "A"),
        (function_2, "B")
    ]
    r = p.map_async(dispatcher, tasks)
    r.wait()
    results = r.get()
Run Code Online (Sandbox Code Playgroud)

答案2(日程安排)

我会从脚本中删除 while 并安排一个 cron 作业(在 GNU/Linux 上)(在 Windows 上),以便操作系统负责它的执行。

在 Linux 上,您可以运行cronotab -e并添加以下行以使脚本每 5 分钟运行一次。

*/5 * * * * python /path/to/script.py
Run Code Online (Sandbox Code Playgroud)

答案 3(共享词典)

是的,但不是。

据我所知,使用管理器来处理集合等数据是最好的方法。对于数组或基本类型(int、float、ecc)来说,存在 Value哪些Array 更快

文档中所示

Manager() 返回的管理器对象控制一个服务器进程,该进程保存 > Python 对象并允许其他进程使用代理来操作它们。

Manager() 返回的管理器将支持类型 list、dict、Namespace、Lock、> RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 > Array。

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。然而,它们比使用共享内存慢。

但您只需返回一个 Dataframe,因此不需要共享字典。

清理的代码

使用之前的所有想法,代码可以重写为:

地图版本

import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp

def worker1(t , seed):
    print('worker1 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
     

def worker2(t , seed):
    print('worker2 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

def dispatcher(args):
    return args[0](*args[1:])

def task_generator(sleep_time=1):
    seed = np.random.randint(0,1000,1)
    yield worker1, sleep_time, seed    
    yield worker2, sleep_time, seed + 1

with mp.Pool(2) as p:
    results = p.map(dispatcher, task_generator())
    merged = pd.concat(results, axis=0)
    print(merged)
Run Code Online (Sandbox Code Playgroud)

如果 Dataframe 的串联过程是瓶颈,则使用 imap 的方法可能会成为最佳选择。

imap版本

with mp.Pool(2) as p:
    merged = pd.DataFrame()
    for result in p.imap_unordered(dispatcher, task_generator()):
        merged = pd.concat([merged,result], axis=0)
    print(merged)
Run Code Online (Sandbox Code Playgroud)

主要区别在于,在map情况下,程序首先等待所有流程任务结束,然后连接所有Dataframe。

而在 imap_unoredered 情况下,一旦任务结束,数据帧就会连接到当前结果。

  • 没问题!map 将任务分配给池中的所有进程。所以是平行的!区别在于map是阻塞的,而map_async是非阻塞的。因此,使用map,你必须等待所有任务完成,而使用map_async,你将获得一些结果对象,然后你必须调用result.wait()和result.get()。 (2认同)