concurrent.futures.ThreadPoolExecutor.map():超时无效

Hao*_*ang 15 python concurrency concurrent.futures

import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      
Run Code Online (Sandbox Code Playgroud)

文件说:

返回的迭代器引发一个concurrent.futures.TimeoutErrorif __next__()被调用,结果timeout在原始调用的秒数之后不可用Executor.map()

但是这里的脚本没有引发任何异常并且一直在等待.有什么建议?

Tra*_*son 5

正如文档指定的那样,只有在调用__next__()地图上的方法时才会引发超时错误。要调用此方法,您可以例如将输出转换为列表:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")
Run Code Online (Sandbox Code Playgroud)

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5
Run Code Online (Sandbox Code Playgroud)

在这里,第 n 个任务休眠了n几秒钟,因此在任务 2 完成后会引发超时。


编辑:如果您想终止未完成的任务,您可以尝试问题中的答案(ThreadPoolExecutor.map()尽管它们不使用),或者您可以忽略其他任务的返回值并让它们完成:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)
Run Code Online (Sandbox Code Playgroud)

输出:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
Run Code Online (Sandbox Code Playgroud)