我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本.
如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?
我先研究过,找不到我的问题的答案.我试图在Python中并行运行多个函数.
我有这样的事情:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
Run Code Online (Sandbox Code Playgroud)
我想调用func1和func2并让它们同时运行.这些函数不会相互交互或在同一个对象上交互.现在我必须等待func1在func2启动之前完成.我如何做以下事情:
process.py
from files import func1, func2
runBothFunc(func1(), func2()) …Run Code Online (Sandbox Code Playgroud) 哪些因素决定了chunksize方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎使用任意启发式作为其默认的chunksize(如下所述); 是什么推动了这种选择,是否有基于某些特定情况/设置的更周到的方法?
示例 - 说我是:
iterable到.map()拥有约1500万个元素的元素;processes = os.cpu_count()内multiprocessing.Pool().我天真的想法是给每24个工人一个同样大小的块,即15_000_000 / 24625,000.大块应该在充分利用所有工人的同时减少营业额/管理费用.但似乎缺少给每个工人提供大批量的一些潜在缺点.这是不完整的图片,我错过了什么?
我的部分问题源于if chunksize=None:both .map()和.starmap()call 的默认逻辑,.map_async()如下所示:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = …Run Code Online (Sandbox Code Playgroud) python parallel-processing multiprocessing python-3.x python-multiprocessing
我在类中有一个方法,该方法需要循环执行大量工作,我想将工作分散到我的所有核心上。
我编写了以下代码,如果我使用normal map,则可以使用,但是pool.map返回错误。
import multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
class OtherClass:
def run(sentence, graph):
return False
class SomeClass:
def __init__(self):
self.sentences = [["Some string"]]
self.graphs = ["string"]
def some_method(self):
other = OtherClass()
def single(params):
sentences, graph = params
return [other.run(sentence, graph) for sentence in sentences]
return list(pool.map(single, zip(self.sentences, self.graphs)))
SomeClass().some_method()
Run Code Online (Sandbox Code Playgroud)
错误:
AttributeError:无法腌制本地对象“ SomeClass.some_method..single”
为什么不能泡菜single?我什至尝试移动single到全局模块范围(不在类内部-使其独立于上下文):
import multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
class OtherClass:
def run(sentence, graph):
return False
def single(params):
other = …Run Code Online (Sandbox Code Playgroud) 我相信之前已经提出过许多类似的问题,但在阅读了很多类似的问题后,我仍然不太确定应该做些什么.所以,我有一个Python脚本来控制一些外部仪器(摄像头和功率计).我通过使用ctypes调用.dll文件中的C函数为两种乐器编写了类.现在它看起来像这样:
for i in range(10):
power_reading = newport.get_reading(N=100,interval=1) # take power meter reading
img = camera.capture(N=10)
value = image_processing(img) # analyze the img (ndarray) to get some values
results.append([power_reading,value]) # add both results to a list
Run Code Online (Sandbox Code Playgroud)
我想同时开始执行前两行.双方newport.get_reading并camera.capture需要100ms-1运行(他们会在同一时间运行,如果我选择了正确的参数).我不需要它们在同一时间完全启动,但理想情况下延迟应该小于总运行时间的大约10-20%(因此当每次运行时运行时间小于0.2秒).根据我的阅读,我可以使用该multiprocessing模块.所以我根据这篇文章尝试这样的事情:
def p_get_reading(newport,N,interval,return_dict):
reading = newport.get_reading(N,interval,return_dict)
return_dict['power_meter'] = reading
def p_capture(camera,N,return_dict):
img = camera.capture(N)
return_dict['image'] = img
for i in range(10):
manager = multiprocessing.Manager()
return_dict = manager.dict()
p = multiprocessing.Process(target=p_capture, args=(camera,10))
p.start()
p2 = multiprocessing.Process(target=p_get_reading, args=(newport,100,1)) …Run Code Online (Sandbox Code Playgroud) python multiprocessing python-multithreading python-3.x python-multiprocessing
多处理模块对于python初学者来说非常困惑,特别是那些刚刚从MATLAB迁移并且使用并行计算工具箱变得懒惰的人.我有以下功能需要大约80秒运行,我想通过使用Python的多处理模块来缩短这个时间.
from time import time
xmax = 100000000
start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)
Run Code Online (Sandbox Code Playgroud)
这按预期输出:
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 8.667453265190124e-07
Total time: 86.67453265190125
Run Code Online (Sandbox Code Playgroud)
由于循环的任何迭代都不依赖于其他循环,我尝试从官方文档中采用此服务器进程来在单独的进程中扫描范围的块.最后我想出了vartec对这个问题的回答,可以准备以下代码.我还根据Darkonaut对当前问题的回答更新了代码.
from time import time
import multiprocessing as …Run Code Online (Sandbox Code Playgroud) python parallel-processing multiprocessing python-3.x python-multiprocessing
假设我有下面的代码,一个执行某些操作的函数,它在进程中启动,并返回一个值。
from multiprocessing import Process
def my_func(arg):
return 'Hello, ' + arg
p1 = Process(target=my_func, args=('John',)
p1.start()
p1.join()
Run Code Online (Sandbox Code Playgroud)
如何获取函数的返回值?
我正在尝试为此循环实现多处理.它无法修改数组,或者似乎没有正确排序作业(在最后一个函数完成之前返回数组).
import multiprocessing
import numpy
def func(i, array):
array[i] = i**2
print(i**2)
def main(n):
array = numpy.zeros(n)
if __name__ == '__main__':
jobs = []
for i in range(0, n):
p = multiprocessing.Process(target=func, args=(i, array))
jobs.append(p)
p.start()
return array
print(main(10))
Run Code Online (Sandbox Code Playgroud) 在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行(2+)个进程,我创建了一个最小的代表性示例,如下所示。
我的问题是:
1)我的理解是,由于我有不同的功能,我不能使用Pool.map_async()及其变体。是对的吗?我见过的唯一例子是重复相同的功能,就像这个答案一样。
2)使此设置永久运行的最佳实践是什么?在下面的代码中,我使用了一个while循环,我怀疑它不适合此目的。
3)Process我使用和 的方式是Manager最佳的吗?我使用multiprocessing.Manager.dict()共享字典来返回进程的结果。我在这个答案的评论中看到,使用Queuehere是有意义的,但是该Queue对象没有“.dict()”方法。所以,我不确定这会如何运作。
对于示例代码的任何改进和建议,我将不胜感激。
import numpy as np
import pandas as pd
import multiprocessing
import time
def worker1(name, t , seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
time.sleep(t)
np.random.seed(seed)
df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
def worker2(name, t, seed, return_dict):
'''worker function'''
print(str(name) + …Run Code Online (Sandbox Code Playgroud)