Python:如何并行运行python函数?

lmc*_*ory 89 python

我先研究过,找不到我的问题的答案.我试图在Python中并行运行多个函数.

我有这样的事情:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)
Run Code Online (Sandbox Code Playgroud)

我想调用func1和func2并让它们同时运行.这些函数不会相互交互或在同一个对象上交互.现在我必须等待func1在func2启动之前完成.我如何做以下事情:

process.py

from files import func1, func2

runBothFunc(func1(), func2())
Run Code Online (Sandbox Code Playgroud)

我希望能够创建非常接近同一时间的两个目录,因为每分钟我都在计算正在创建的文件数量.如果目录不在那里,它会甩掉我的时间.

NPE*_*NPE 131

你可以使用threadingmultiprocessing.

由于CPython的特殊性,threading不太可能实现真正的并行性.出于这个原因,multiprocessing通常是一个更好的选择.

这是一个完整的例子:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()
Run Code Online (Sandbox Code Playgroud)

启动/加入子进程的机制可以很容易地封装到一个函数中runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
Run Code Online (Sandbox Code Playgroud)

  • 我使用了你的代码,但功能仍然没有同时启动. (4认同)
  • @Lamar McAdory:请解释一下"同时"到底是什么意思,或许给出一个具体的例子,说明你做了什么,你期待发生什么,以及实际发生了什么. (4认同)
  • @Lamar:你永远不能保证"完全相同的时间",并认为你可以是完全错误的.根据您拥有的cpu数量,计算机的负载,计算机上发生的许多事情的时间都将影响线程/进程启动的时间.此外,由于流程在创建后立即启动,因此创建流程的开销也必须按您看到的时差计算. (4认同)
  • 如果我的函数采用参数,并且当我在从单独的进程调用它们时传递参数时,它们不会同时运行。你能帮忙吗 (2认同)
  • 如果函数有参数怎么办? (2认同)

BIC*_*ube 12

似乎您有一个函数需要调用两个不同的参数。这可以使用Python 3.2+的组合concurrent.futures和优雅地完成map

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]
Run Code Online (Sandbox Code Playgroud)

现在,如果您的操作是 IO 绑定的,那么您可以这样使用ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
Run Code Online (Sandbox Code Playgroud)

请注意map此处如何将map您的函数用于参数列表。

现在,如果您的函数受 CPU 限制,那么您可以使用 ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
Run Code Online (Sandbox Code Playgroud)

如果您不确定,您可以简单地尝试这两种方法,看看哪一种能给您带来更好的结果。

最后,如果您想打印结果,您可以简单地执行以下操作:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
Run Code Online (Sandbox Code Playgroud)


Ion*_*ica 11

可以使用Ray优雅地完成此任务,该系统使您可以轻松地并行化和分发Python代码。

要并行处理示例,您需要使用@ray.remote装饰器定义函数,然后使用调用它们.remote

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 
Run Code Online (Sandbox Code Playgroud)

如果将相同的参数传递给两个函数且参数较大,则使用的更有效方法ray.put()。这样可以避免将大参数序列化两次并为其创建两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])
Run Code Online (Sandbox Code Playgroud)

如果func1()func2()返回结果,则需要按以下方式重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
Run Code Online (Sandbox Code Playgroud)

多处理模块相比,使用Ray有许多优点。特别是,相同的代码将在单台计算机以及一台计算机集群上运行。有关Ray的更多优点,请参见此相关文章


bru*_*iuz 8

2021 年最简单的方法是使用 asyncio:

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

参考:

[1] https://docs.python.org/3/library/asyncio-task.html

  • 如果我没记错的话,这不是真正的并行。Asyncio 将使用阻塞时间来运行另一个任务..所以,一次。只有一个任务正在执行 (4认同)
  • @user3786340 是对的,你可以在这里看到这篇文章中的论点:https://towardsdatascience.com/concurrency-and-parallelism-in-python-bbd7af8c6625 它说:“但是 asyncio 任务背后的想法与线程不同事实上,任务在单个线程上运行。但是,如果第一个任务正在等待其响应而不是阻塞它,则每个任务都允许操作系统运行另一个任务。这就是异步 IO 的本质。(更彻底的演练-在后面的文章中通过异步程序)。” (3认同)
  • 错误的答案。Asyncio 不适用于并行性 (2认同)

Dav*_*ter 7

如果您的函数主要用于I / O工作(而CPU工作量较少),并且您具有Python 3.2+,则可以使用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])
Run Code Online (Sandbox Code Playgroud)

如果您的功能主要是在做CPU工作(而I / O则更少),并且您拥有Python 2.6+,则可以使用多处理模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
Run Code Online (Sandbox Code Playgroud)


Aru*_*raj 5

如果您是Windows用户并且使用python 3,那么本文将帮助您在python中进行并行编程。当您运行常规的多处理库的池编程时,您将在程序中遇到关于主要功能的错误。这是因为Windows没有fork()功能。下面的帖子提供了上述问题的解决方案。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

由于我使用的是python 3,因此我对该程序进行了如下更改:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
Run Code Online (Sandbox Code Playgroud)

使用此功能后,上面的问题代码也做了如下更改:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])
Run Code Online (Sandbox Code Playgroud)

我得到的输出为:

[1, 8, 27, 64, 125, 216]
Run Code Online (Sandbox Code Playgroud)

我认为这篇文章可能对某些Windows用户有用。