如何使用 concurrent.futures.ThreadPoolExecutor 或 multiprocessing.pool.ThreadPool 将一些变量绑定到线程?

rog*_*ger 1 python threadpoolexecutor python-multiprocessing

我想做的是这样的:

class MyThread(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        # self._sock = self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def run(self):
        for i in range(3):
            print("current id: {}".format(self._id))

def main():
    ts = []
    for i in range(5):
        t = MyThread("localhost", 3001)
        t.start()
        ts.append(t)

    for t in ts:
        t.join()
Run Code Online (Sandbox Code Playgroud)

我得到了这些输出:

current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98
Run Code Online (Sandbox Code Playgroud)

这个输出是我想要的。如您所见, my_id在不同线程中是不同的,但是在单线程中,我共享相同的_id.(_id只是这些变量之一,我还有许多其他类似的变量)。

现在,我想做同样的事情 multiprocessing.pool.ThreadPool

class MyProcessor():
    def __init__(self, host, port):
        # self._sock = self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def __call__(self, i):
        print("current id: {}".format(self._id))
        return self._id * i

def main():
    with ThreadPool(5) as p:
        p.map(MyProcessor("localhost", 3001), range(15))
Run Code Online (Sandbox Code Playgroud)

但现在_id将被所有线程共享:

current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
Run Code Online (Sandbox Code Playgroud)

concurrent.futures.ThreadPoolExecutor,我也尝试做同样的事情:

class MyProcessor():
    def __init__(self, host, port):
        # self.initsocket(host, port)
        self._id = random.randint(0, 100)

    def __call__(self, i):
        print("current id: {}".format(self._id))
        return self._id * i

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        func = MyProcessor("localhost", 3001)
        futures = [executor.submit(func, i) for i in range(15)]
        for f in as_completed(futures):
            pass
Run Code Online (Sandbox Code Playgroud)

输出是这样的:

current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
Run Code Online (Sandbox Code Playgroud)

当然,我得到这个结果并不奇怪,因为我只是调用__init__了一次。但我要问的是:

我怎么能用concurrent.futures.ThreadPoolExecutorand做同样的事情multiprocessing.pool.ThreadPool(也请不要更多的全局变量)。

Mad*_*ist 5

这里有几个问题,我会尽力解决所有问题。

在您给出的第一个示例中,您可以完全控制Thread您创建的所有s,因此每个线程在初始化程序中获得一个唯一 ID。问题当然是您一次启动所有线程,这对于大量线程来说可能非常低效。

在问题中的两个线程池示例中,您为可调用对象初始化 ID 一次,因此当然每个线程没有单独的 ID。正确的做法是为每个线程初始化一个 ID,__call__方法是:

类 MyProcessor():
    def __init__(self, host, port):
        self.initsocket(主机,端口)

    def __call__(self, i):
        id_ = random.randint(0, 100)
        打印(“当前ID:{}”。格式(id_))
        返回id_ * i

定义主():
    func = MyProcessor("本地主机", 3001)
    使用 ThreadPoolExecutor(max_workers=5) 作为执行器:
        collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)

请注意,如果您只关心最终结果而不是中间对象,您也可以concurrent.futures.ThreadPoolExecutor通过使用map那里的方法来缩短示例Future。该deque(..., maxlen=0)调用是使用迭代器的标准习惯用法。

鉴于您在评论中链接到的要点,我理解您为什么想要拥有线程本地数据。但是,您当然不需要全局变量来实现该结果。这里有几个替代方案:

  1. 只需将您的线程本地数据添加到self初始化程序中,瞧,所有调用都可以访问它而不是全局的:

    def __init__(self, host, port):
        self.thread_local = threading.local()
    
    def __call__(self, i):
        try:
            id_ = self.thread_local.id_
        except AttributeError:
            id_ = random.randint(0, 100)
        ...
    
    Run Code Online (Sandbox Code Playgroud)
  2. 使用函数本地数据而不是线程本地数据。您正在使用线程本地数据来避免将您的连接(在要点中)传递给某些私有函数。这不是真正的需要,只是一种审美选择。你总是可以有def _send_data(self, conn, **kwargs)and def _recv_data(self, conn),因为连接实际上来自的唯一地方是__call__反正。

虽然可能存在选项1是一种可能性的情况下,我强烈建议您不要使用任何类型的线程池管理器的使用。线程池可以重用相同的线程从任务提交到的队列中顺序运行任务。这意味着您将在本应打开自己的任务中获得相同的连接。在您的原始示例中,您可以独立创建所有线程,但是当您MyProcessor对回收池线程进行多次调用时,可能就不行了。