我发现很难在Redis中创建一个具有300多个连接的连接池.我是在多线程应用程序的易变环境中使用它.有没有经历过这样的部署可以帮助我确定这是否可行?谢谢.
我想要一个提供最多X个线程来处理任务的线程池,到目前为止没问题.但是,每个提交的任务都可以指定特定限制的IO目标(比如Y).
因此,提交的IOTask以限制4(Y)返回目标"google.com",并且池具有全局限制16(X).我想提交10个google.com-tasks,其中只有4个并行处理,并且该池有12个线程可用于其他任务.
我怎样才能做到这一点?
我正在寻找Linux中的线程池抽象,它提供与Win32线程池提供的相同级别的内核调度程序支持.具体来说,我有兴趣找到一个维护一定数量的运行线程的线程池.当正在运行的池线程阻塞I/O时,我希望线程池足够智能以启动另一个运行的线程.
任何人都知道这样的Linux吗?
我试图用Python实现多处理.它可以在汇集非常快速的任务时工作,但是当汇集更长的任务时会冻结.请参阅下面的示例:
from multiprocessing import Pool
import math
import time
def iter_count(addition):
print "starting ", addition
for i in range(1,99999999+addition):
if i==99999999:
print "completed ", addition
break
if __name__ == '__main__':
print "starting pooling "
pool = Pool(processes=2)
time_start = time.time()
possibleFactors = range(1,3)
try:
pool.map( iter_count, possibleFactors)
except:
print "exception"
pool.close()
pool.join()
#iter_count(1)
#iter_count(2)
time_end = time.time()
print "total loading time is : ", round(time_end-time_start, 4)," seconds"
Run Code Online (Sandbox Code Playgroud)
在这个例子中,如果我在for循环中使用较小的数字(类似9999999)它可以工作.但是当它运行99999999时会冻结.我尝试按顺序运行两个进程(iter_count(1)
和iter_count(2)
),大约需要28秒,所以不是一个大任务.但是当我把它们汇集起来时它会冻结.我知道python中存在一些已知的多处理错误,但在我的情况下,相同的代码适用于较小的子任务,但冻结较大的子任务.
我有一个包含三个结构池的程序.对于它们中的每一个,我使用一个使用过的结构列表,另一个用于未使用的结构.在执行期间,程序使用结构,并根据需要将它们返回池中.此外,还有一个垃圾收集器来清理"僵尸"结构并将它们返回到池中.
在执行开始时,虚拟内存(如预期的那样)显示大约10GB*的内存分配,并且当程序使用该池时,RSS内存会增加.
虽然使用的节点返回池中,标记为未使用的节点,但RSS内存不会减少.我期待这一点,因为操作系统不知道我正在对内存做什么,无法注意到我是在真正使用它们还是管理池.
我想做的是强制未使用的内存随时返回虚拟内存,例如,当RSS内存增加到X GB以上时.
有没有办法在给定内存指针的情况下标记将其放入虚拟内存的内存区域?我知道这是操作系统的责任,但也许有办法强迫它.
也许我不应该关心这个,你怎么看?
提前致谢.
我提供了几个文件的池使用情况与内存使用情况的图片.正如您所看到的,池使用中的突然下降是由于垃圾收集器,我希望看到的是,这种下降反映在内存使用中.
假设我有一个任务提供程序-可读的通道,它可能会或可能不会提供任务(取决于工作量)具体来说,这样一来可能几个小时都没有工作,然后可能突然出现任务
我想让我的goroutine池从1增长到N,其中N是出现工作时的最大并发性,然后自动崩溃到1,在那里goroutine的工作时间超过X秒,以避免内存/ cpu浪费。
我本可以只使用一个固定的池,因为goroutine非常便宜,但是我不喜欢有成千上万个空闲的goroutine,我可能会更好地使用那些资源(应该主要是ram,但仍然可以)
折叠部分相当容易
for {
timeoutTimer := time.NewTimer(WORKER_ROUTINE_TIMEOUT)
select {
case taskContext, isBatchRunning := <-runner.tasksCh:
if !isBatchRunning {
log.Print("task provider is closed, quit worker goroutine")
return
}
runner.job.Process(&taskContext)
case <-timeoutTimer.C:
return
}
}
Run Code Online (Sandbox Code Playgroud)
但是我不确定如何使池动态增长,即在哪种条件下产生新的池
此池的优先级是能够对增加的负载做出快速反应并扩展到N(最大并发)goroutines,并能够在工作负载减少时最终崩溃到更合理的数量(最小为1)。
PS:我看到了一个https://github.com/Jeffail/tunny包,但看起来它与当前池大小的自适应缩放没有任何相似之处。我想念什么吗?
谢谢!
这个问题可能是重复的。但是,我在这个话题上读了很多东西,但没有找到与我的情况相符的东西-至少,我不理解。
不便之处,敬请原谅。
我想做的是相当普遍的,将kwargs列表传递给pool.starmap(),以实现多处理。
这是我的简化案例:
def compute(firstArg, **kwargs): # A function that does things
# Fancy computing stuff...
# Saving to disk...
return True
if __name__ == '__main__':
args = [{
'firstArg': "some data",
'otherArg': "some other data"
'andAnotherArg': x*2
} for x in range(42)]
pool = Pool(4)
pool.starmap(compute, args)
pool.close()
pool.terminate()
Run Code Online (Sandbox Code Playgroud)
我以为starmap()将解压缩字典并将其作为关键字args传递给compute(),但是查看源代码(另请参见l.46),它仅发送键(或值?)。
因此它提出了:
TypeError: compute() takes 1 positional argument but 3 were given
Run Code Online (Sandbox Code Playgroud)
它必须是一种清晰,直接的方法来进行此操作。任何帮助将不胜感激。
这是一个非常类似的问题:Python多重处理-如何将kwargs传递给函数?
我将python 2.7与multiprocessing :: Pool一起使用以并行运行作业
我简化了下面的示例,但这是它的主要要旨。
它将使用该apply_async()
函数为我的字典中的每个人创建一个文件。但是,当我检查文件是否正确创建时,我注意到有时文件没有创建。
现在我想我在使用multiprocessing :: Pool的方式上做错了
有什么建议吗?
import os
from multiprocessing import Pool
def outputFile(person):
ofh=open(person+'.txt','w')
ofh.write('test\n')
ofh.close()
pool = Pool(processes=4)
for person in person_dict:
pool.apply_async(outputFile,args(person))
pool.close()
pool.join()
for person in person_dict:
print os.path.isfile(person+'.txt')
Run Code Online (Sandbox Code Playgroud)
True
True
False
True
Run Code Online (Sandbox Code Playgroud) 我有一个简单的服务器:
from multiprocessing import Pool, TimeoutError
import time
import os
if __name__ == '__main__':
# start worker processes
pool = Pool(processes=1)
while True:
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
try:
print(res.get(timeout=1)) # prints the PID of that process
except TimeoutError:
print('worker timed out')
time.sleep(5)
pool.close()
print("Now the pool is closed and no longer available")
pool.join()
print("Done")
Run Code Online (Sandbox Code Playgroud)
如果我运行这个,我会得到类似的东西:
47292
47292
Run Code Online (Sandbox Code Playgroud)
然后我kill 47292
在服务器运行时.启动了新的工作进程,但服务器的输出是:
47292
47292
worker timed out
worker timed out
worker …
Run Code Online (Sandbox Code Playgroud) pool ×10
python ×4
java ×2
boost ×1
boost-asio ×1
c ×1
c++ ×1
concurrency ×1
connection ×1
go ×1
goroutine ×1
linux ×1
memory ×1
multiprocess ×1
python-2.7 ×1
redis ×1
scala ×1
scheduler ×1
shutdown ×1
worker ×1