PHP pThreads - 你如何执行垃圾收集?

Mat*_*att 2 php multithreading garbage-collection memory-leaks pthreads

鉴于以下代码,如何确保已完成的MyWorker对象被销毁/其内存被释放?

由于我的脚本需要我需要~50个线程不断从cURL获取数据并进行处理.

我已经尝试过让线程永远不会离开run(),或者如这个示例代码中所示,它们会离开run并让collect函数生成它们的新副本.

但不管我在一分钟左右后达到内存限制.你能告诉我我做错了什么吗?

class MyWorker extends Threaded
{
    public $complete;
    public function __construct() {$this->complete = false;}
    public function run() {$this->complete = true;}
}

$pool = new Pool(50);
for($i=0; $i<50; $i++)
    $pool->submit(new MyWorker());
$pool->collect(function($worker)
{
    global $pool;
    if($worker->complete == true)
        $pool->submit(new MyWorker());
    return $worker->complete;
});
$pool->shutdown();
Run Code Online (Sandbox Code Playgroud)

Joe*_*ins 8

为什么

我为什么要收集呢?

Worker通过并行线程提供线程需要程序员保留正确的引用Threaded正在执行的对象.程序员很难在userland中可靠地实现,因此pthreads提供了为您维护引用的Pool抽象Workers.

为了保持这些引用,pthreads需要知道对象何时是垃圾,它Pool::collect为此提供了接口.Pool::collect接受一个Closure,它接受一个Threaded对象,true如果传递的对象完成执行,则返回布尔值.

怎么样

手头的任务 ...

为了继续提交执行任务而不是耗尽资源,您必须创建一个已完成任务的队列,以便重新提交 Pool

以下代码演示了一种理智的方式:

<?php

define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
        Mutex::lock(LOG);
        echo vsprintf(
            "{$message}\n", $args);
        Mutex::unlock(LOG);
    }
}

class Request extends Threaded {
    public function __construct($url) {
        $this->url = $url;
    }

    public function run() {
        $response = @file_get_contents($this->url);

        slog("%s returned %d bytes",
            $this->url, strlen($response));

        $this->reQueue();
    }

    public function getURL()        { return $this->url; }

    public function isQueued()      { return $this->queued; }
    public function reQueue()       { $this->queued = true; }

    protected $url;
    protected $queued = false;
}

/* create a pool of 50 threads */
$pool = new Pool(50);

/* submit 50 requests for execution */
while (@$i++<50) {
    $pool->submit(new Request(sprintf(
        "http://google.com/?q=%s", md5($i))));
}

do {
    $queue = array();

    $pool->collect(function($request) use ($pool, &$queue) {
        /* check for items to requeue */
        if ($request->isQueued()) {
            /* get the url for the request, insert into queue */
            $queue[] = 
                $request->getURL();
            /* allow this job to be collected */
            return true;
        }
    });

    /* resubmit completed tasks to pool */
    if (count($queue)) {
        foreach ($queue as $queued)
            $pool->submit(new Request($queued));
    }

    /* sleep for a couple of seconds here ... because, be nice ! */
    usleep(2.5 * 1000000);
} while (true);
?>
Run Code Online (Sandbox Code Playgroud)