Python ThreadPoolExecutor - 保证在与提交的func相同的线程中运行的回调吗?

Pra*_*ota 12 python multithreading callback concurrent.futures

在ThreadPoolExecutor(TPE)中,回调始终保证在与提交的函数相同的线程中运行吗?

例如,我使用以下代码对此进行了测试.我跑了很多次,它似乎funccallback始终在同一个线程跑.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 
Run Code Online (Sandbox Code Playgroud)

但是,当我删除time.sleep(random.random())语句时,它似乎失败了,即至少有几个func函数并且callbacks 没有在同一个线程中运行.

对于我正在处理的项目,回调必须始终与提交的函数在同一个线程上运行,所以我想确保这是由TPE保证的.(而且没有随机睡眠的测试结果似乎令人费解).

我查看了执行程序源代码,看起来我们在运行回调之前没有将线程切换到主线程.但只是想确定一下.

dan*_*ano 10

文档不保证运行哪个线程回调.唯一记录的保证是回调将在属于添加回调的进程的线程中运行,但可能是任何线程,因为您使用的是ThreadPoolExecutor而不是ProcessPoolExecutor:

添加的callables按添加顺序调用,并且始终在属于添加它们的进程的线程中调用.


在当前的ThreadPoolExecutor实现中,回调执行的线程取决于Future添加回调时的状态,以及是否Future取消回调.这些是实施细节; 你不应该依赖它们,因为它们在不同的Python实现或不同的版本中可能会有所不同,并且它们如有变更,恕不另行通知.

如果在Future完成后添加回调,则回调将在您调用的任何线程中执行add_done_callback.您可以通过查看add_done_callback源来看到:

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)
Run Code Online (Sandbox Code Playgroud)

如果Future指示它被取消或完成的状态,fn则立即在当前执行线程中调用.否则,它会添加到内部回调列表中,以便在Future完成后运行.

例如:

>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>
Run Code Online (Sandbox Code Playgroud)

如果成功cancel调用取消了future ,则执行取消的线程会立即调用所有回调:

def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True
Run Code Online (Sandbox Code Playgroud)

否则,回调由执行未来任务的线程调用.

  • 那么“ProcessPoolExecutor”呢?回调的`get_ident`也不同 (2认同)