将来自循环的多个线程排队的最安全方法是什么?

hax*_*hax 7 python queue python-multithreading python-3.x

我的脚本遍历输入文件的每一行,并使用每一行中的字符串执行一些操作。由于在每一行上执行的任务是相互独立的,我决定将任务分开,threads这样脚本就不必等待任务完成才能继续循环。代码如下。


def myFunction(line, param):
    # Doing something with line and param
    # Sends multiple HTTP requests and parse the response and produce outputs
    # Returns nothing

param = arg[1]   
with open(targets, "r") as listfile:
    for line in listfile:
        print("Starting a thread for: ",line)
        t=threading.Thread(target=myFunction, args=(line, param,)) 
        threads.append(t)
        t.start()

Run Code Online (Sandbox Code Playgroud)

我意识到这是一个坏主意,因为输入文件中的行数越来越大。使用此代码,线程数将与行数一样多。研究了一下,认为这queues就是方法。

我想了解在这种情况下使用队列的最佳方式,以及是否有任何我可以使用的替代方案。

Pix*_*teK 6

为了解决这个问题,您可以使用线程池的概念,您可以在其中定义要使用的固定数量的线程/工作线程,例如 5 个工作线程,并且每当一个线程完成执行时,另一个 Future(ly) 提交的线程将自动取而代之。

例子 :

import concurrent.futures

def myFunction(line, param):
    print("Done with :", line, param)

param = "param_example"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    with open("targets", "r") as listfile:
        for line in listfile:
            print("Starting a thread for: ", line)
            futures.append(executor.submit(myFunction, line=line, param=param))

    # waiting for the threads to finish and maybe print a result :
    for future in concurrent.futures.as_completed(futures):
        print(future.result()) # an Exceptino should be handled here!!!
Run Code Online (Sandbox Code Playgroud)


Git*_*ont 5

队列是一种方法。使用它们的方法是将函数参数放入队列中,并使用线程来获取它们并进行处理。

在这种情况下,队列大小并不重要,因为读取下一行很快。在另一种情况下,更优化的解决方案是将队列大小设置为至少线程数的两倍。这样,如果所有线程同时处理完队列中的一个项目,它们都将准备好处理队列中的下一个项目。

为了避免代码复杂化,可以将线程设置为守护线程,以便它们在处理完成后不会阻止程序完成。当主进程完成时它们将被终止。

另一种方法是为每个线程在队列中放置一个特殊的项目(例如None),并让线程在从队列中获取它后退出,然后加入线程。

对于下面的示例,工作线程的数量是使用workers变量设置的。

这是使用队列的解决方案的示例。

from queue import Queue
from threading import Thread

queue = Queue(workers * 2)
def work():
    while True:
        myFunction(*queue.get())
        queue.task_done()

for _ in range(workers):
    Thread(target=work, daemon=True).start()

with open(targets, 'r') as listfile:
    for line in listfile:
        queue.put((line, param))
queue.join()
Run Code Online (Sandbox Code Playgroud)

更简单的解决方案可能是使用 ThreadPoolExecutor。在这种情况下特别简单,因为被调用的函数不会返回需要在主线程中使用的任何内容。

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=workers) as executor:
    with open(targets, 'r') as listfile:
        for line in listfile:
            executor.submit(myFunction, line, param)
Run Code Online (Sandbox Code Playgroud)

另外,如果将所有行存储在内存中不是问题,那么有一个解决方案,除了线程之外不使用任何其他东西。工作的划分方式是线程从列表中读取某些行并忽略其他行。两个线程的一个简单示例是一个线程读取奇数行,另一个线程读取偶数行。

from threading import Thread

with open(targets, 'r') as listfile:
    lines = listfile.readlines()

def work_split(n):
    for line in lines[n::workers]:
        myFunction(line, param)

threads = []
for n in range(workers):
    t = Thread(target=work_split, args=(n,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()
Run Code Online (Sandbox Code Playgroud)

我已经做了一个快速基准测试,并且Queue比 稍快ThreadPoolExecutor,但是拆分工作的解决方案比两者都快。