Mar*_*tin 326 python multithreading missing-features
工作线程是否有Pool类,类似于多处理模块的Pool类?
我喜欢例如并行化地图功能的简单方法
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
Run Code Online (Sandbox Code Playgroud)
但是我想在没有创建新流程的开销的情况下这样做.
我知道GIL.但是,在我的用例中,该函数将是一个IO绑定的C函数,python包装器将在实际函数调用之前释放GIL.
我是否必须编写自己的线程池?
Mar*_*tin 426
我刚刚发现模块中确实 存在一个基于线程的Pool接口multiprocessing
,但是它有些隐藏,没有正确记录.
它可以通过导入
from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)
它是使用包装python线程的虚拟Process类实现的.可以multiprocessing.dummy
在文档中简要提及这个基于线程的Process类.据推测,这个虚拟模块基于线程提供整个多处理接口.
小智 208
在Python 3中你可以使用concurrent.futures.ThreadPoolExecutor
,即:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Run Code Online (Sandbox Code Playgroud)
有关更多信息和示例,请参阅文档.
war*_*res 57
是的,它似乎(或多或少)具有相同的API.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
Run Code Online (Sandbox Code Playgroud)
dgo*_*sen 37
对于非常简单和轻量级的东西(从这里略微修改):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Run Code Online (Sandbox Code Playgroud)
要支持任务完成时的回调,您只需将回调添加到任务元组即可.
小智 11
您好在Python中使用线程池,您可以使用此库:
from multiprocessing.dummy import Pool as ThreadPool
Run Code Online (Sandbox Code Playgroud)
然后使用,这个库就是这样的:
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
Run Code Online (Sandbox Code Playgroud)
线程是您想要的线程数,任务是大多数映射到服务的任务列表.
另一种方法是将进程添加到线程队列池中
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
for i in range(10):
a = executor.submit(arg1, arg2,....)
Run Code Online (Sandbox Code Playgroud)
这是我最终使用的结果。它是上述dgorissen类的修改版本。
文件: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
Run Code Online (Sandbox Code Playgroud)
使用游泳池
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print('sleeping for (%d)sec' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Run Code Online (Sandbox Code Playgroud)
是的,有一个类似于多处理池的线程池,但是,它有点隐藏并且没有正确记录。您可以通过以下方式导入它:-
from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)
我只是给你展示一个简单的例子
def test_multithread_stringio_read_csv(self):
# see gh-11786
max_row_range = 10000
num_files = 100
bytes_to_df = [
'\n'.join(
['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
).encode() for j in range(num_files)]
files = [BytesIO(b) for b in bytes_to_df]
# read all files in many threads
pool = ThreadPool(8)
results = pool.map(self.read_csv, files)
first_result = results[0]
for result in results:
tm.assert_frame_equal(first_result, result)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
272196 次 |
最近记录: |