我注意到Gevent有线程池对象.有人可以向我解释何时使用线程池以及何时使用常规池?什么是gevent.threadpool和gevent.pool之间的区别?
我有一个应用程序,它有一个线程池(ThreadPoolExecutor),它是每个执行HttpGet操作的传递任务,并将InputStream读入byte []以执行某些操作.
在阅读了HttpClient文档后,我得出的结论是,跨多个线程管理HttpClient连接的最佳方法是创建一个ThreadSafeClientConnManager并在整个应用程序中共享它.
实现这一点之后,我注意到即使在完成所有任务之后,仍然有大量内存仍由ThreadSafeClientConnManager使用.
查看堆转储,此内存采用byte []数组的形式.这些不是我创建的任何引用.它们由ThreadSafeClientConnManager及其池的各个部分保存.我不确定它们是否与InputStream相关或者它们是否是其他内容.
所有任务本身及其变量都被成功地垃圾收集.
如果我在ThreadSafeClientConnManager上调用getConnectionManager().shutdown(),则释放所有内存就好了.但是,我不想关闭连接,因为这些HttpGet任务可能随时发生.我希望在应用程序生命期间保持打开状态.
随着HttpGet任务的运行,持有的内存越来越多,最终可能导致内存不足错误.任务完成后,内存不会被释放.
在完成使用它的任务后,如何确保释放内存?
这是我正在使用的代码.它与HttpClient文档中的代码拼凑在一起,其他问题在SO和在线上.
HttpClient的创建:
// Create and initialize HTTP parameters
HttpParams params = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(params, 40 * 1000);
HttpConnectionParams.setSoTimeout(params, 40 * 1000);
ConnManagerParams.setMaxTotalConnections(params, 100);
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
// Create and initialize scheme registry
SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register( new Scheme("http", PlainSocketFactory.getSocketFactory(), 80));
schemeRegistry.register(new Scheme("https", SSLSocketFactory.getSocketFactory(), 443));
// Create an HttpClient with the ThreadSafeClientConnManager.
// This connection manager must be used if more than one thread will
// be using the HttpClient. …Run Code Online (Sandbox Code Playgroud) 我知道有一些现存的问题,它们提供了很好的一般观点.我希望在C#/ VB.Net方面获得一些有关这些观点的实际实现(而不是哲学)的细节.
我有一个WCF服务,除其他外,它接收文件.对于服务的大部分时间而言,这个特定区域实际上只是无所事事 - 当工作确实到来时,它会以极大不同的数量突然到达.
对于收到的每个文件(最多可以是每秒数千个),服务需要在1-10秒(每个)之间处理文件,具体取决于许多其他服务,本地资源和网络IO等待时间.
为了帮助这些突发工作负载的服务,我实现了一个队列系统.每秒收到的数千个文件被放置在队列中.控制器根据队列的大小计算要使用的线程数,直到达到"峰值最大线程数"设置,这将阻止它创建其他线程.这些线程放在线程池中,并重用于循环队列.控制器将; 每隔一段时间 重新计算所需的线程数.如果队列大小减小,则释放相关数量的线程.
我应该达到多少线程?很明显,每次收到一个文件时添加一个新的线程对于缺少一个更好的词来说是愚蠢的 - 性能最多会恶化.当每个核心的CPU利用率仅为10%时,限制线程,似乎也不是资源的最佳使用.
那么,有没有一种合适的方法来确定要限制的线程数?我希望服务可以通过对可用资源进行抽样来确定这一点,但这样做是否会影响性能?我知道常见的答案是监控工作负载,通过反复试验调整计数,直到找到我喜欢的数字,但由于此服务的性质(长时间闲置,然后是高/突发工作负载),可能需要很长时间是时候得到那种信息了.
那么,如果我们将服务器的图像移动到与第一个更快/更慢/不同的不同主机上呢?我必须重新重新整理过程吗?
理想情况下,我所追求的是协调员智能地增加线程池的大小,直到CPU利用率为x%(80%是合理的?90%?99%?).显然,我想这样做而不需要添加超过命中x%所需的线程,否则所有我最终都会遇到的不仅仅是等待IO资源,而是等待彼此.
提前致谢!
相关问题(如果您想要一些通用的想法):
如果我没有让问题变得更加困难,哪里会很有趣?
按照目前的情况,该服务在这些爆发期间定期达到100%cpu.问题是CPU利用率高峰.它从空闲(0-10%)变为100%,然后再次退回.我不确定我能帮到那个 - 理想情况下我不会把它全部带到100%.问题的存在是因为提到的文件实际上是图像,服务过程的一部分是将图像传递给System.Windows.Media黑盒,它为我做了一些复杂的图像处理.
由于IO等待和其他正在进行的处理,因此峰值之间存在间歇.如果达到100%的峰值无法帮助(我知道如何防止这种情况,或者我应该知道)我应该如何针对CPU利用率图表进行研究?经常坐在100%?弹跳在50-100之间?如果我确实通过抽样来确定哪些效果最好,那么是否可以保证切换虚拟服务器的主机在同一个图表中也能发挥最佳效果?
对于那些愿意回答的人,我不会考虑这种复杂性.随意忽略这一部分.然而,任何答案也解释了这种复杂性,甚至答案只是提供如何处理它的提示,我至少会upvote!
哎呀很长的问题 - 抱歉 - 并且感谢您的阅读!
Sun Java(1.6)ScheduledThreadPoolExecutor是ThreadPoolExecutor内部扩展,它的实现DelayQueue是一个无界队列.我需要的是一个ScheduledThreadpoolExecutor有界队列即它在队列中的积累,这样,当队列中的任务超过了限度,它开始拒绝进一步提交的任务,防止JVM走出去的记忆任务的限制.
令人惊讶的是,谷歌或stackoverflow没有指出我正在讨论这个问题的任何结果.有没有这样的东西可用我错过了?如果没有,我如何实现ScheduledThreadpoolExecutor以最佳方式提供我期望的功能?
我搜索了很多但找不到任何解决方案.我用这样的方式使用java线程池:
ExecutorService c = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; ++i) {
c.execute(new MyTask(i));
}
Run Code Online (Sandbox Code Playgroud)
以这种方式,任务以后续顺序执行(如在队列中).但我需要改变"选择下一个任务"策略.所以我希望为每个任务分配指定优先级(它不是线程优先级),并且执行任务对应于这些优先级.因此,当执行程序完成另一个任务时,它会将下一个任务选为具有最高优先级的任务.它描述了常见问题.也许有更简单的方法不考虑优先级.它选择最后添加的任务作为执行而不是第一次添加.粗略地讲,FixedThreadPool使用FIFO策略.我可以使用例如LIFO策略吗?
我需要在Java中构建一个工作池,每个工作者都有自己的连接套接字; 当工作线程运行时,它使用套接字但保持打开以便以后重用.我们决定使用这种方法,因为与临时创建,连接和销毁套接字相关的开销需要太多的开销,所以我们需要一种方法,通过这种方法,工作池预先初始化了它们的套接字连接,准备好在保持套接字资源不受其他线程影响的同时承担工作(套接字不是线程安全的),所以我们需要这些内容......
public class SocketTask implements Runnable {
Socket socket;
public SocketTask(){
//create + connect socket here
}
public void run(){
//use socket here
}
Run Code Online (Sandbox Code Playgroud)
}
在应用程序启动时,我们想要初始化工作程序,并希望套接字连接在某种程度上......
MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
pool.addWorker( new WorkerThread());
Run Code Online (Sandbox Code Playgroud)
当应用程序请求工作时,我们将任务发送到工作池以立即执行...
pool.queueWork( new SocketTask(..));
Run Code Online (Sandbox Code Playgroud)
更新了工作代码
根据Gray和jontejj的有用评论,我有以下代码工作...
SocketTask
public class SocketTask implements Runnable {
private String workDetails;
private static final ThreadLocal<Socket> threadLocal =
new ThreadLocal<Socket>(){
@Override
protected Socket initialValue(){
return new Socket();
}
};
public SocketTask(String details){ …Run Code Online (Sandbox Code Playgroud) 在使用2Tb DRAM的80核(160HT)nehalem架构上运行一些测试后,我遇到了一个小的HPC问题:
当每个线程开始请求关于"错误"套接字上的对象的信息时,具有多于2个套接字的服务器开始停顿很多(延迟),即请求来自正在处理一个套接字上的某些对象的线程以提取信息这实际上是在另一个插槽上的DRAM中.
即使我知道他们正在等待远程套接字返回请求,核心也会100%被利用.
由于大多数代码以异步方式运行,因此重写代码要容易得多,因此我只需解析来自一个套接字上的线程的消息就可以解析其他代码(没有锁定等待).另外我想将每个线程锁定到内存池,因此我可以更新对象而不是浪费时间(~30%)在垃圾收集器上.
因此问题是:
如何在Python中使用预定的内存池对象将线程固定到核心?
更多背景:
当你把ZeroMQ放在中间并且在每个ZMQworker管理的内存池之间传递消息时,Python运行多核没有问题.在ZMQ的8M msg /秒时,对象的内部更新需要比管道填充更长的时间.这一切都在这里描述:http://zguide.zeromq.org/page:all # Chapter-Sockets-and-Patterns
因此,稍微过度简化,我会生成80个ZMQworkerprocesses和1个ZMQrouter,并使用大量对象加载上下文(实际上是5.84亿个对象).从这个"起始点"开始,对象需要进行交互以完成计算.
这是个主意:
要做到这一点,我需要知道:
但是我无法在python文档中找到关于如何执行此操作的参考资料,并且在google上我必须搜索错误的内容.
更新:
关于"为什么在MPI架构上使用ZeroMQ?"的问题,请阅读主题:Spread vs MPI vs zeromq?由于我工作的应用程序被设计用于即使它在架构测试,其中MPI分布式部署是更合适的.
更新2:
关于这个问题:
"如何在Python(3)中将线程固定到具有预定内存池的核心"答案在psutils中:
>>> import psutil
>>> psutil.cpu_count()
4
>>> p = psutil.Process()
>>> p.cpu_affinity() # get
[0, 1, 2, 3]
>>> p.cpu_affinity([0]) # set; from now on, this process will run on CPU #0 only
>>> p.cpu_affinity()
[0]
>>>
>>> # reset affinity …Run Code Online (Sandbox Code Playgroud) 我试图找到一种方法来直接从multiprocessing.PoolPython 的类实例获取进程数.有没有办法做到这一点?
文档没有显示任何相关内容.
谢谢
所以我有一个我正在编写的算法,该函数multiprocess应该调用另一个函数,CreateMatrixMp()并行调用与cpus一样多的进程.我以前从未做过多处理,也不能确定以下哪种方法更有效.在函数的上下文中使用"高效"这个词CreateMatrixMp()需要被调用数千次.我已经阅读了python multiprocessing模块的所有文档,并且已经有了这两种可能性:
首先是使用这个Pool类:
def MatrixHelper(self, args):
return self.CreateMatrix(*args)
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
poolCount = cpus*2
args = [(sigmaI, sigmaX, i) for i in range(self.numPixels)]
pool = mp.Pool(processes = poolCount, maxtasksperchild= 2)
tempData = pool.map(self.MatrixHelper, args)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
接下来是使用这个Process类:
def Multiprocess(self, sigmaI, sigmaX):
cpus = mp.cpu_count()
print('Number of cpu\'s to process WM: %d' % cpus)
processes = [mp.Process(target …Run Code Online (Sandbox Code Playgroud) 我有一个在ASP.NET MVC应用程序中使用的以下代码示例.此代码的目的是为排队一些长时间运行的操作创建"即发即弃"请求.
public JsonResult SomeAction() {
HttpContext ctx = HttpContext.Current;
Task.Run(() => {
HttpContext.Current = ctx;
//Other long running code here.
});
return Json("{ 'status': 'Work Queued' }");
}
Run Code Online (Sandbox Code Playgroud)
我知道这不是在异步代码中处理HttpContext.Current的好方法,但是目前我们的实现不允许我们做其他事情.我想了解这段代码有多危险......
问题:理论上可以在Task.Run中设置HttpContext,将上下文设置为另一个请求吗?
我想是的,但我不确定.我是如何理解的:Request1是从线程池中的Thread1处理的,然后当Thread1绝对处理另一个请求(Request2)时,Task.Run中的代码将设置从Request1到Request2的上下文.
也许我错了,但我对ASP.NET内部的了解不允许我正确地理解它.
谢谢!
threadpool ×10
java ×4
python ×4
concurrency ×2
.net-4.0 ×1
android ×1
asp.net ×1
asp.net-mvc ×1
asynchronous ×1
c# ×1
gevent ×1
hpc ×1
httpclient ×1
httpcontext ×1
memory-leaks ×1
thread-local ×1