Python多处理 - 跟踪pool.map操作的过程

use*_*147 10 python pool multiprocessing

我有一个函数执行一些模拟并返回一个字符串格式的数组.

我想运行模拟(函数)来改变输入参数值,超过10000个可能的输入值,并将结果写入单个文件.

我正在使用多处理,特别是pool.map函数来并行运行模拟.

由于运行模拟功能超过10000次的整个过程需要很长时间,我真的想跟踪整个操作的过程.

我认为下面我当前代码中的问题是,pool.map运行该函数10000次,在这些操作期间没有任何进程跟踪.一旦并行处理完成10000次模拟(可能是几小时到几天),那么我会在10000个模拟结果被保存到文件时继续跟踪.所以这并不是真正跟踪pool.map操作的处理.

是否可以轻松修复我的代码,以便进行流程跟踪?

def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')
Run Code Online (Sandbox Code Playgroud)

Mik*_*rns 11

如果使用迭代map函数,则可以很容易地跟踪进度.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> def simFunction(x,y):
...   import time
...   time.sleep(2)
...   return x**2 + y
... 
>>> x,y = range(100),range(-100,100,2)
>>> res = Pool().imap(simFunction, x,y)
>>> with open('results.txt', 'w') as out:
...   for i in x:
...     out.write("%s\n" % res.next())
...     if i%10 is 0:
...       print "%s of %s simulated" % (i, len(x))
... 
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用异步map.在这里,我会做一些不同的事情,只是混合起来.

>>> import time
>>> res = Pool().amap(simFunction, x,y)
>>> while not res.ready():
...   print "waiting..."
...   time.sleep(5)
... 
waiting...
waiting...
waiting...
waiting...
>>> res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
Run Code Online (Sandbox Code Playgroud)

请注意,我正在使用pathos.multiprocessing而不是multiprocessing.它只是一个分支multiprocessing,使您能够map使用多个输入执行功能,具有更好的序列化,并允许您在map任何地方(而不仅仅是__main__)执行调用.您也可以使用multiprocessing上面的代码,但代码会略有不同.

迭代或异步map将使您能够编写您想要更好的过程跟踪的任何代码.例如,将唯一的"id"传递给每个作业,并观察哪个作业返回,或让每个作业返回其进程ID.有很多方法可以跟踪进度和流程......但上面的内容应该会给你一个开始.

你可以到pathos这里:https: //github.com/uqfoundation


Jua*_*oco 5

我认为你需要的是一个日志文件.

我建议你使用日志模块,它是Python标准库的一部分.但不幸的是,日志记录不是多处理安全的.所以你不能在你的应用程序中开箱即用.

因此,您需要使用多处理安全的日志处理程序或使用Queue实现您的日志处理程序或锁定以及日志记录模块.

Stackoverflow中有很多关于此的讨论.例如:在Python中使用多处理时应该如何记录?

如果大多数CPU负载都在模拟功能中并且您不打算使用日志轮换,则可以使用这样的简单锁定机制:

import multiprocessing
import logging

from random import random
import time


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(process)s %(levelname)s %(message)s',
    filename='results.log',
    filemode='a'
)


def simulation(a):
    # logging
    with multiprocessing.Lock():
        logging.debug("Simulating with %s" % a)

    # simulation
    time.sleep(random())
    result = a*2

    # logging
    with multiprocessing.Lock():
        logging.debug("Finished simulation with %s. Result is %s" % (a, result))

    return result

if __name__ == '__main__':

    logging.debug("Starting the simulation")
    inputs = [x for x in xrange(100)]
    num_cores = multiprocessing.cpu_count()
    print "num_cores: %d" % num_cores
    pool = multiprocessing.Pool(processes=num_cores)
    t = pool.map(simulation, inputs)
    logging.debug("The simulation has ended")
Run Code Online (Sandbox Code Playgroud)

您可以在运行时"尾随-f"您的日志文件.这是你应该看到的:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5
Run Code Online (Sandbox Code Playgroud)

在Windows和Linux上尝试过.

希望这可以帮助