PHP pthreads - 共享对象

cot*_*ton 5 php multithreading pthreads synchronized

我正在寻找一种安全快捷的方式来使用共享对象.

我在这里问过这个问题:https://github.com/krakjoe/pthreads/issues/470 但是显然这不是正确的地方.

尝试与许多其他上下文(Thread)共享一个对象(Threaded).所有线程都在更新此分片对象 - 它们可以设置自己的请求,也必须响应其他人的请求.

现在krakjoe回应说锁定/解锁在7中不可用我有问题.

我知道:.synchronized但不知道如何使用它来满足我的需求.

我如何使用:: synchronized来编写类似的方法

  • 锁()
  • 开锁()
  • is_locked() - 检查是否已锁定,如果是,请不要尝试 - 继续尝试稍后

编辑:

我写了一个(imo)非常简单的测试脚本.

这个脚本包括没有 SYC /锁/ ...方法大气压.

它应该只显示我想要做的事情.

我仍然在寻找一种方法来使用::使这个共享安全.

码:

<?php
/*
TEST:
    create n threads
    each will
        - Shared::set() its own ref
        - check if Shared::exists() its own ref
        - Shared::get() its ref back
        - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context

TODO:
    using ::synchronized to handle multi-context-access

NOTES:
    every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"
        see: "Method Modifiers - Special Behaviour"
            at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
*/
class Shared extends Threaded
{
    public $data;
    public function exists($ident)
    {
        return isset($this->data[$ident]);
    }
    public function set($ident, $ref)
    {
        $return = false;
        if(!isset($this->data[$ident])){
            $data = $this->data;
            $data[$ident] = $ref;
            $this->data = $data;
            $return = $this->data[$ident];
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
    public function get($ident)
    {
        $return = false;
        if($this->exists($ident) === true){
            $data = $this->data;
            $return = $data[$ident];
            unset($data[$ident]);
            $this->data = $data;
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
}

class T extends Thread
{
    public $count;
    public function __construct(Shared $Shared, $ident)
    {
        $this->Shared = $Shared;
        $this->ident = $ident;
    }
    public function run()
    {
        $slowdown = true;
        $this->count = 0;
        while(true){
            if($slowdown){
                // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
                //  loop a bit to simulate work:
                $start = microtime(true);
                $until = rand(1, 100000)/1000000;
                while(microtime(true)-$start < $until){
                    // ...
                }
            }

            if($this->Shared->exists($this->ident) === true){
                $ref = $this->Shared->get($this->ident);
            }
            else{
                $ref = $this->Shared->set($this->ident, $this);
            }
            // calling a method on $ref -- if not a ref we crash
            $ref->isRunning();
            unset($ref);
            $this->count++;
        }
    }
}


echo 'start ...' . PHP_EOL;

$n = 8;
$Shared = new Shared();
for($i = 0, $refs = array(); $i < $n; $i++){
    $refs[$i] = new T($Shared, $i);
    $refs[$i]->start();
}

while(!empty($refs)){
    // print status:
    if(!isset($t)or microtime(true)-$t > 1){
        $t = microtime(true);
        echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
    }

    // join crashed threads:
    foreach($refs as $i => $thread){
        if($thread->isRunning() === false){
            echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
            if($thread->isJoined() === false){
                $thread->join();
            }
            unset($refs[$i]);
        }
    }
}

echo 'no thread running anymore.' . PHP_EOL;

/* output
start ...
status: 8 running atm ...

Notice: Undefined offset: 6 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-6 stopped after 10
status: 7 running atm ...

Notice: Undefined offset: 4 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-4 stopped after 35
status: 6 running atm ...

Notice: Undefined offset: 7 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-7 stopped after 43
status: 5 running atm ...
status: 5 running atm ...
status: 5 running atm ...

[...]
*/
?>
Run Code Online (Sandbox Code Playgroud)

Joe*_*ins 8

Threaded对象已经是线程安全的,也就是说,无论何时读取,写入,检查是否存在或删除(取消设置)成员,操作都是原子的 - 没有其他上下文可以执行任何上述操作,而第一次操作发生.对于用户不知道的引擎处理程序也是如此,从最低级别开始的一切都是隐式安全的.

然而,相当重新确定......当逻辑变得更加复杂时,这有明显的限制,比如在设置或使用它做其他事情之前检查成员的存在,就像你在做的那样:虽然对象上的操作是原子的,没有什么可以阻止unset调用isset和读取属性/维度的调用之间的成员的另一个上下文.

这适用于PHP7(pthreads v3 +)

安全和诚信是两件不同的事情.当完整性很重要时,您可以Threaded::synchronized在PHP7中使用它来正确保存它.在PHP5中你也可以保留它,但代码会更复杂,解释也是如此.

如果我理解它的逻辑,你的第二个例子应该无限期地运行.所以我正在使用这个假设来构造正确的代码,我将进一步假设你可能想要在这个无限循环中做什么,并提供一些看似需要的洞察力.

<?php
class Referee extends Threaded {

    public function find(string $ident, Threaded $reference) {
        return $this->synchronized(function () use($ident, $reference) {
            if (isset($this[$ident])) {
                return $this[$ident];
            } else return ($this[$ident] = $reference);
        });
    }

    public function foreach(Closure $closure) {
        $this->synchronized(function() use($closure) {
            foreach ($this as $ident => $reference) {
                $closure($ident, $reference);
            }
        });
    }
}

class Test extends Thread {

    public function __construct(Referee $referee, string $ident, bool $delay) {
        $this->referee = $referee;
        $this->ident   = $ident;
        $this->delay   = $delay;
    }

    public function run() {
        while (1) {
            if ($this->delay) {
                $this->synchronized(function(){
                    $this->wait(1000000);
                });
            }

            $reference = 
                $this->referee->find($this->ident, $this);

            /* do something with reference here, I guess */         

            /* do something with all references here */
            $this->referee->foreach(function($ident, $reference){
                var_dump(Thread::getCurrentThreadId(),
                        $reference->getIdent(), 
                        $reference->isRunning());
            });
        }
    }

    public function getIdent() {
        return $this->ident;
    }

    private $referee;
    private $ident;
    private $delay;
}

$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
    "smelly",
    "dopey",
    "bashful",
    "grumpy",
    "sneezy",
    "sleepy",
    "happy",
    "naughty"
];

while ($thread < 8) {
    $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
    $threads[$thread]->start();
    $thread++;
}

foreach ($threads as $thread)
    $thread->join();
?>
Run Code Online (Sandbox Code Playgroud)

因此,我们将看看差异,我会告诉你为什么他们是这样,你怎么写他们,你已经知道我们现在不是在谈论安全,而是诚信,你得到了(相当的)非凡的假设,你写的任何东西都是"安全的",正如所解释的那样.

第一个主要区别是:

if ($this->delay) {
    $this->synchronized(function(){
        $this->wait(1000000);
    });
}
Run Code Online (Sandbox Code Playgroud)

这只是一种Thread等待的合适方式,您不必使用Thread自身进行同步,您可以使用任何Threaded对象.正确处理的好处是,如果不清楚,睡眠和睡眠不会使线程处于接受状态,使用::wait确实.

在现实世界,在这里你真的应该永远只等待东西,这将是一个更复杂的块,它可能(也应该)看起来更像:

if ($this->delay) {
    $this->synchronized(function(){
        while ($this->condition) {
            $this->wait(1000000);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

注意:等待超时在技术上等待某些事情,但是,除了已达到超时之外,您可能会被唤醒,并且应该为此准备代码.

这样,另一个上下文能够通知Thread它应该立即停止等待和关闭,或者立即执行一些其他重要操作,只需同步,更改条件并通知Thread.

对于可预测的代码,了解同步,等待和通知工作的方式非常重要.

接下来我们有设置和/或获取参考的逻辑:

$reference = 
    $this->referee->find($this->ident, $this);
Run Code Online (Sandbox Code Playgroud)

哪个调用:

public function find(string $ident, Threaded $reference) {
    return $this->synchronized(function () use($ident, $reference) {
        if (isset($this[$ident])) {
            return $this[$ident];
        } else return ($this[$ident] = $reference);
    });
}
Run Code Online (Sandbox Code Playgroud)

这个命名很糟糕,命名很难,但是您可以看到在执行这些分组操作时,同步会保留完整性.也可以使用相同的方法来获取对另一个对象的引用,并进行一些调整.

我猜你用那个特定的引用做了一些事情($this当前总是这样).我无法猜出是什么.继续 ...

我假设您要对这些中的每一个做一些事情Threads,并且您希望在整个迭代发生时保持数据的完整性:

$this->referee->foreach(function($ident, $reference){
    var_dump(Thread::getCurrentThreadId(),
            $reference->getIdent(), 
            $reference->isRunning());
});
Run Code Online (Sandbox Code Playgroud)

哪个调用:

public function foreach(Closure $closure) {
    $this->synchronized(function() use($closure) {
        foreach ($this as $ident => $reference) {
            $closure($ident, $reference);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

这就是你如何做这样的事情.

值得一提的是,这里不一定需要同步; 就像从正在迭代的数组中删除一个成员一样,如果在迭代发生时取消设置或设置或对对象执行任何其他操作,则不会发生任何不良情况.