我有一个有界的信号量对象,确保我的程序一次不下载超过一定数量的文件.每个工作线程在开始下载时获取信号量,并在完成后释放它.
我有另一个线程,想要在没有下载任何内容时运行代码.我想要一种锁定方法,直到信号量完全可用.我怎么能用Python做到这一点?
import threading
threads = []
for n in range(0, 60000):
t = threading.Thread(target=function,args=(x, n))
t.start()
threads.append(t)
for t in threads:
t.join()
Run Code Online (Sandbox Code Playgroud)
它在我的笔记本电脑上的范围高达800,但是如果我将范围增加到800以上,我就会收到错误can't create new thread.
如何控制线程创建的数量或任何其他方式使其像超时一样工作?我尝试使用threading.BoundedSemaphore功能,但似乎没有正常工作.
我有一个项目列表aprox 60,000项 - 我想向数据库发送查询以检查它们是否存在以及它们是否确实返回了一些计算结果.我运行一个普通的查询,一个一个地遍历列表,查询已经运行了最后4天.我以为我可以使用线程模块来改进这一点.我做了这样的事
if __name__ == '__main__':
for ra, dec in candidates:
t = threading.Thread(target=search_sl, args=(ra,dec, q))
t.start()
t.join()
Run Code Online (Sandbox Code Playgroud)
我只测试了10个项目并且工作正常 - 当我提交了整个60k项目列表时,我遇到了错误,即"超出最大会话数".我想要做的是一次创建10个线程.当第一串线程完成激活时,我发送另一个请求,依此类推.
按下按钮后我的界面冻结了。我正在使用线程,但我不确定为什么仍然挂起。任何帮助将不胜感激。提前致谢
class magic:
def __init__(self):
self.mainQueue=queue.Queue()
def addItem(self,q):
self.mainQueue.put(q)
def startConverting(self,funcName):
if(funcName=="test"):
while not self.mainQueue.empty():
t = Thread(target = self.threaded_function)
t.start()
t.join()
def threaded_function(self):
time.sleep(5)
print(self.mainQueue.get())
m=magic()
def helloCallBack():
m.addItem("asd")
m.startConverting("test") //this line of code is freezing
B = tkinter.Button(top, text ="Hello", command = helloCallBack)
B.pack()
top.mainloop()
Run Code Online (Sandbox Code Playgroud) python multithreading tkinter python-multithreading python-3.x
我正在尝试使用一组计算机来运行数百万个小型模拟。为此,我尝试在主计算机上设置两台“服务器”,一台用于将队列中的输入变量添加到网络,另一台用于处理结果。
这是将内容放入模拟变量队列的代码:
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers, self).__init__()
def get_sim_obj(self, offset, db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0]) …Run Code Online (Sandbox Code Playgroud) 我想知道如何限制这样的东西,一次只使用10个线程
with open("data.txt") as f:
for line in f:
lines = line.rstrip("\n\r")
t1 = Thread(target=Checker, args=("company"))
t1.start()
Run Code Online (Sandbox Code Playgroud) 我正在尝试解决一个问题,我有很多(大约一万个)URL,需要从所有这些网站下载内容.我一直这样做是为了"链接中的链接:"循环到现在为止,但现在花费的时间太长了.我认为是时候实现多线程或多处理方法了.我的问题是,最好的方法是什么?
我知道全局解释器锁,但由于我的问题是网络限制,而不是CPU限制,我认为这不会是一个问题.我需要将数据从每个线程/进程传回主线程/进程.我不需要帮助实现任何方法(当任何线程完成任务时终止多个线程),我需要建议采取哪种方法.我目前的做法:
data_list = get_data(...)
output = []
for datum in data:
output.append(get_URL_data(datum))
return output
Run Code Online (Sandbox Code Playgroud)
没有其他共享状态.
我认为最好的方法是拥有一个包含所有数据的队列,并从输入队列中弹出几个工作线程,获取URL数据,然后推送到输出队列.
我对吗?有什么我想念的吗?这是我第一次用任何语言实现多线程代码,我知道这通常是一个难题.
场景:
我有一个非常大的数据库模型迁移正在进行新的构建,我正在研究如何将Web应用程序中的当前实时数据迁移到本地测试数据库中.
我想在python中设置一个脚本,它将同时处理我的模型的迁移.我有我的模型实例from_legacy和to_legacy方法.到目前为止,我加载了所有实例并threads为每个实例创建,每个线程从核心threading模块子类化,使用一个run只进行转换并保存结果的方法.
我想让程序中的主循环构建这些线程的大堆实例,并开始逐个处理它们,在它工作时只运行最多10个,然后将下一个进行处理当其他人完成迁移时处理.
我无法弄清楚如何正确利用队列来做到这一点?如果每个线程代表移民的全部任务,我应该首先加载所有的实例,然后创建一个Queue与maxsize设置为10,并有只跟踪当前正在运行的队列?也许这样的事情?
currently_running = Queue()
for model in models:
task = Migrate(models) #this is subclassed thread
currently_running.put(task)
task.start()
Run Code Online (Sandbox Code Playgroud)
在这种情况下put,当它处于容量时依赖于阻塞的调用?如果我要走这条路,我怎么打电话task_done?
或者更确切地说,Queue是否应该包含所有任务(不仅仅是已启动的任务)并使用join阻止完成?调用join线程队列是否会启动包含的线程?
什么是最好的方法来处理"最多有N个运行线程"问题以及Queue应该扮演什么角色?
现在,我有一个for循环遍历列表的循环,通常此列表的长度为100-500。在for循环中,每个项目都会打开一个新线程。所以现在我的代码看起来像这样:
threads = []
for item in items:
t = threading.Thread(target=myfunction, args=(item,))
threads.append(t)
t.start()
Run Code Online (Sandbox Code Playgroud)
但是我不想每个线程都启动一个新线程,因为每个线程最多只需要花费几秒钟即可执行myfunction。我仍然想做我的循环,在参数中的每个项目上调用myfunction。但是一旦完成就关闭线程,并允许另一个线程接管。我要打开的最大线程数不小于3,不大于20。尽管更简单,但该范围可以变化。我只是不想在循环中的每个项目上打开一个新线程。
对于那些好奇的人,如果重要的话。myfunction是我定义的函数,它使用urllib将发布请求发送到站点。
我是python的新手,但不是一起编码的新手。对不起,菜鸟问题。
我在 Python 3 中使用多处理池时遇到了一个非常特殊的问题......请参阅下面的代码:
import multiprocessing as MP
class c(object):
def __init__(self):
self.foo = ""
def a(self, b):
return b
def main(self):
with open("/path/to/2million/lines/file", "r") as f:
self.foo = f.readlines()
o = c()
o.main()
p = MP.Pool(5)
for r in p.imap(o.a, range(1,10)):
print(r)
Run Code Online (Sandbox Code Playgroud)
如果我按原样执行此代码,这是我非常慢的结果:
1
2
3
4
5
6
7
8
9
real 0m6.641s
user 0m7.256s
sys 0m1.824s
Run Code Online (Sandbox Code Playgroud)
但是,如果我删除了该行o.main(),那么执行时间会快得多:
1
2
3
4
5
6
7
8
9
real 0m0.155s
user 0m0.048s
sys 0m0.004s
Run Code Online (Sandbox Code Playgroud)
我的环境具有充足的功能,并且我已确保不会遇到任何内存限制。我还用较小的文件进行了测试,执行时间更容易接受。有什么见解吗?
编辑:我删除了磁盘 IO …
我一直在myfunc使用threading.Thread 并行调用代码,如下所示:
def myfunc(elt,other):
subprocess.call("A matlab script that takes a while to execute")
allThreads = []
for elt in allElts:
allThreads.append(threading.Thread(target=myfunc,args=(elt,other)))
for t in allThreads:
t.start()
for t in allThreads:
t.join()
Run Code Online (Sandbox Code Playgroud)
由于数据量很大,我遇到了内存问题:我的一些subscribe.call引发了内存问题,无法分配.为了避免这个问题,我试图将同时执行的线程数限制为8.我将上面的代码更改为以下代码:
someThreads = []
k = 0
for k in range(len(allElts)):
if k%8 == 1:
for t in someThreads:
t.start()
for t in someThreads:
t.join()
someThreads = []
someThreads.append(threading.Thread(target=myfunc,args=(allElts[k],other)))
else:
someThreads.append(threading.Thread(target=myfunc,args=(allElts[k],other)))
k += 1
Run Code Online (Sandbox Code Playgroud)
这应该最多创建8个线程并执行它们.但是,这段代码的结果与我之前得到的结果不同,显然是错误的.这有什么问题?
我在列表中有一个很大的数据集,需要做一些工作。
我想在任意给定时间启动x数量的线程以在列表上工作,直到弹出该列表中的所有内容为止。
我知道如何在给定的时间(通过使用thread1 .... thread20.start())启动x数量的线程(说20个)
但是当前20个线程之一完成时,如何使它启动一个新线程?因此在任何给定时间,有20个线程在运行,直到列表为空。
我到目前为止所拥有的:
class queryData(threading.Thread):
def __init__(self,threadID):
threading.Thread.__init__(self)
self.threadID = threadID
def run(self):
global lst
#Get trade from list
trade = lst.pop()
tradeId=trade[0][1][:6]
print tradeId
thread1 = queryData(1)
thread1.start()
Run Code Online (Sandbox Code Playgroud)
更新资料
我的代码如下:
for i in range(20):
threads.append(queryData(i))
for thread in threads:
thread.start()
while len(lst)>0:
for iter,thread in enumerate(threads):
thread.join()
lock.acquire()
threads[iter] = queryData(i)
threads[iter].start()
lock.release()
Run Code Online (Sandbox Code Playgroud)
现在它从头开始启动20个线程...然后在一个线程结束时继续启动一个新线程。
但是,它效率不高,因为它等待列表中的第一个完成,然后再等待第二个..依此类推。
有更好的方法吗?
基本上我需要:
-Start 20 threads:
-While list is not empty:
-wait for 1 of the 20 threads to finish
-reuse …Run Code Online (Sandbox Code Playgroud) 要将列表加载到Python中的队列中,我发现此代码段无法正常工作。没有项目添加到队列:
from queue import Queue
my_list = [1,2,3,4,5,6,7,8,9,10]
q = Queue()
# This code doesn't work
map(q.put, my_list)
q.qsize() # Returns zero, which is unexpected
Run Code Online (Sandbox Code Playgroud)
更详细的解决方案:
for num in my_list:
q.put(num)
print(q.qsize()) # returns 10 as expected
Run Code Online (Sandbox Code Playgroud)
可以正常工作。我在这里想念什么?