hax*_*hax 7 python queue python-multithreading python-3.x
我的脚本遍历输入文件的每一行,并使用每一行中的字符串执行一些操作。由于在每一行上执行的任务是相互独立的,我决定将任务分开,threads这样脚本就不必等待任务完成才能继续循环。代码如下。
def myFunction(line, param):
# Doing something with line and param
# Sends multiple HTTP requests and parse the response and produce outputs
# Returns nothing
param = arg[1]
with open(targets, "r") as listfile:
for line in listfile:
print("Starting a thread for: ",line)
t=threading.Thread(target=myFunction, args=(line, param,))
threads.append(t)
t.start()
Run Code Online (Sandbox Code Playgroud)
我意识到这是一个坏主意,因为输入文件中的行数越来越大。使用此代码,线程数将与行数一样多。研究了一下,认为这queues就是方法。
我想了解在这种情况下使用队列的最佳方式,以及是否有任何我可以使用的替代方案。
为了解决这个问题,您可以使用线程池的概念,您可以在其中定义要使用的固定数量的线程/工作线程,例如 5 个工作线程,并且每当一个线程完成执行时,另一个 Future(ly) 提交的线程将自动取而代之。
例子 :
import concurrent.futures
def myFunction(line, param):
print("Done with :", line, param)
param = "param_example"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
with open("targets", "r") as listfile:
for line in listfile:
print("Starting a thread for: ", line)
futures.append(executor.submit(myFunction, line=line, param=param))
# waiting for the threads to finish and maybe print a result :
for future in concurrent.futures.as_completed(futures):
print(future.result()) # an Exceptino should be handled here!!!
Run Code Online (Sandbox Code Playgroud)
队列是一种方法。使用它们的方法是将函数参数放入队列中,并使用线程来获取它们并进行处理。
在这种情况下,队列大小并不重要,因为读取下一行很快。在另一种情况下,更优化的解决方案是将队列大小设置为至少线程数的两倍。这样,如果所有线程同时处理完队列中的一个项目,它们都将准备好处理队列中的下一个项目。
为了避免代码复杂化,可以将线程设置为守护线程,以便它们在处理完成后不会阻止程序完成。当主进程完成时它们将被终止。
另一种方法是为每个线程在队列中放置一个特殊的项目(例如None),并让线程在从队列中获取它后退出,然后加入线程。
对于下面的示例,工作线程的数量是使用workers变量设置的。
这是使用队列的解决方案的示例。
from queue import Queue
from threading import Thread
queue = Queue(workers * 2)
def work():
while True:
myFunction(*queue.get())
queue.task_done()
for _ in range(workers):
Thread(target=work, daemon=True).start()
with open(targets, 'r') as listfile:
for line in listfile:
queue.put((line, param))
queue.join()
Run Code Online (Sandbox Code Playgroud)
更简单的解决方案可能是使用 ThreadPoolExecutor。在这种情况下特别简单,因为被调用的函数不会返回需要在主线程中使用的任何内容。
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as executor:
with open(targets, 'r') as listfile:
for line in listfile:
executor.submit(myFunction, line, param)
Run Code Online (Sandbox Code Playgroud)
另外,如果将所有行存储在内存中不是问题,那么有一个解决方案,除了线程之外不使用任何其他东西。工作的划分方式是线程从列表中读取某些行并忽略其他行。两个线程的一个简单示例是一个线程读取奇数行,另一个线程读取偶数行。
from threading import Thread
with open(targets, 'r') as listfile:
lines = listfile.readlines()
def work_split(n):
for line in lines[n::workers]:
myFunction(line, param)
threads = []
for n in range(workers):
t = Thread(target=work_split, args=(n,))
t.start()
threads.append(t)
for t in threads:
t.join()
Run Code Online (Sandbox Code Playgroud)
我已经做了一个快速基准测试,并且Queue比 稍快ThreadPoolExecutor,但是拆分工作的解决方案比两者都快。