Mat*_*att 2 php multithreading garbage-collection memory-leaks pthreads
鉴于以下代码,如何确保已完成的MyWorker对象被销毁/其内存被释放?
由于我的脚本需要我需要~50个线程不断从cURL获取数据并进行处理.
我已经尝试过让线程永远不会离开run(),或者如这个示例代码中所示,它们会离开run并让collect函数生成它们的新副本.
但不管我在一分钟左右后达到内存限制.你能告诉我我做错了什么吗?
class MyWorker extends Threaded
{
public $complete;
public function __construct() {$this->complete = false;}
public function run() {$this->complete = true;}
}
$pool = new Pool(50);
for($i=0; $i<50; $i++)
$pool->submit(new MyWorker());
$pool->collect(function($worker)
{
global $pool;
if($worker->complete == true)
$pool->submit(new MyWorker());
return $worker->complete;
});
$pool->shutdown();
Run Code Online (Sandbox Code Playgroud)
我为什么要收集呢?
在Worker通过并行线程提供线程需要程序员保留正确的引用Threaded正在执行的对象.程序员很难在userland中可靠地实现,因此pthreads提供了为您维护引用的Pool抽象Workers.
为了保持这些引用,pthreads需要知道对象何时是垃圾,它Pool::collect为此提供了接口.Pool::collect接受一个Closure,它接受一个Threaded对象,true如果传递的对象完成执行,则返回布尔值.
手头的任务 ...
为了继续提交执行任务而不是耗尽资源,您必须创建一个已完成任务的队列,以便重新提交 Pool
以下代码演示了一种理智的方式:
<?php
define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
Mutex::lock(LOG);
echo vsprintf(
"{$message}\n", $args);
Mutex::unlock(LOG);
}
}
class Request extends Threaded {
public function __construct($url) {
$this->url = $url;
}
public function run() {
$response = @file_get_contents($this->url);
slog("%s returned %d bytes",
$this->url, strlen($response));
$this->reQueue();
}
public function getURL() { return $this->url; }
public function isQueued() { return $this->queued; }
public function reQueue() { $this->queued = true; }
protected $url;
protected $queued = false;
}
/* create a pool of 50 threads */
$pool = new Pool(50);
/* submit 50 requests for execution */
while (@$i++<50) {
$pool->submit(new Request(sprintf(
"http://google.com/?q=%s", md5($i))));
}
do {
$queue = array();
$pool->collect(function($request) use ($pool, &$queue) {
/* check for items to requeue */
if ($request->isQueued()) {
/* get the url for the request, insert into queue */
$queue[] =
$request->getURL();
/* allow this job to be collected */
return true;
}
});
/* resubmit completed tasks to pool */
if (count($queue)) {
foreach ($queue as $queued)
$pool->submit(new Request($queued));
}
/* sleep for a couple of seconds here ... because, be nice ! */
usleep(2.5 * 1000000);
} while (true);
?>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1599 次 |
| 最近记录: |