Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:
cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.
不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.
向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)
我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?
例:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps) …Run Code Online (Sandbox Code Playgroud) 我想使用 python ThreadPoolExecutor (附加代码)同时调用两个 api。如果这两个 api 调用中的任何一个有响应,我想停止调用另一个。因为对于我的用例,两个 api 之一将需要很长时间才能返回响应,我想避免调用。
def get_rest_api_response(url):
return requets.get(url)
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
no_future_is_done = True
while(no_future_is_done):
if f1.done():
no_future_is_done = False
print("f1 is done")
output = f1.result()
print(f2.cancel()) ######------> Failing!
if f2.done():
no_future_is_done = False
print("f2 is done")
output = f2.result()
print(f1.cancel()) ######-------> Failing!
print(output)Run Code Online (Sandbox Code Playgroud)
我正在使用 future.cancel() 但它失败并返回 False。 https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel
我还有其他方法可以实现这一目标吗?
python multithreading threadpool python-multithreading threadpoolexecutor
我正在运行一段 python 代码,其中多个线程通过线程池执行程序运行。每个线程都应该执行一项任务(例如获取网页)。我希望能够做的是终止所有线程,即使其中一个线程失败。例如:
with ThreadPoolExecutor(self._num_threads) as executor:
jobs = []
for path in paths:
kw = {"path": path}
jobs.append(executor.submit(start,**kw))
for job in futures.as_completed(jobs):
result = job.result()
print(result)
def start(*args,**kwargs):
#fetch the page
if(success):
return True
else:
#Signal all threads to stop
Run Code Online (Sandbox Code Playgroud)
有可能这样做吗?除非所有线程都成功,否则线程返回的结果对我来说是无用的,因此即使其中一个失败,我也想节省其余线程的一些执行时间并立即终止它们。实际代码显然正在执行相对冗长的任务,有几个故障点。
我正在尝试在事件循环之外使用协程函数.(在这种情况下,我想在Django中调用一个也可以在事件循环中使用的函数)
如果不将调用函数作为协程,似乎没有办法做到这一点.
我意识到Django是为了阻塞而构建的,因此与asyncio不兼容.虽然我认为这个问题可能会帮助那些正在进行转换或使用遗留代码的人.
就此而言,它可能有助于理解异步编程以及为什么它不能用于阻塞代码.
这个小片段打印 False
import subprocess
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future=executor.submit(subprocess.call, ['sleep', '2'])
print(future.cancel())
Run Code Online (Sandbox Code Playgroud)
根据文档,这意味着"未来无法取消":https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.cancel
cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.
有没有办法取消未来,虽然cancel()方法告诉我它不能被取消?
我在Python 2.7上运行backport
一般来说,我对异步编程和python还是很陌生,但是我想在状态机内实现异步功能。要求如下:
为了实现目标1和2,我已经成功实现了我的异步例程,以在执行器中运行,如下所示:asyncio:是否可以取消由执行器运行的未来?。调试时,我看到我也完成了#2,因为原始线程继续成功执行。但是,我很难实现回调函数。请注意,此回调函数必须在原始线程上运行,因为它将更改分配给它的对象的状态。我该怎么做?
我的代码:
class Free(State):
def write(self, manager):
write_future = self.write_async(manager)
self.set_write_future(write_future)
self.change_state(manager, busy_state)
def write_async(self, manager):
event = threading.Event()
write_future = asyncio.get_event_loop().run_in_executor(None, self.async_write, 10, event)
write_future.add_done_callback(functools.partial(self.async_write_callback, manager))
return event
def async_write(self, seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('writing {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done writing {}'.format(seconds_to_block))
def async_write_callback(self, manager):
#never gets called
self.terminate_future()
self.change_state(manager, free_state)
Run Code Online (Sandbox Code Playgroud) 类似的问题(但答案对我不起作用):如何取消使用 concurrent.futures.ProcessPoolExecutor 运行的长时间运行的子进程?
与上面链接的问题和提供的解决方案不同,在我的情况下,计算本身相当长(受 CPU 限制)并且无法循环运行以检查是否发生了某些事件。
以下代码的简化版本:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = …Run Code Online (Sandbox Code Playgroud) python subprocess python-asyncio python-multiprocessing process-pool
我正在运行需要时间才能完成的功能。用户可以选择停止此功能/事件。有没有简单的方法来停止线程或循环?
class ThreadsGenerator:
MAX_WORKERS = 5
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=self.MAX_WORKERS)
self.loop = None
self.future = None
def execute_function(self, function_to_execute, *args):
self.loop = asyncio.get_event_loop()
self.future = self.loop.run_in_executor(self._executor, function_to_execute, *args)
return self.future
Run Code Online (Sandbox Code Playgroud)
我想在用户单击停止按钮时尽快停止该功能,而不是等待完成其工作。
提前致谢!
python multithreading python-3.x python-asyncio concurrent.futures
python ×7
asynchronous ×1
multiprocess ×1
process-pool ×1
python-3.x ×1
subprocess ×1
threadpool ×1