cot*_*ton 5 php multithreading pthreads synchronized
我正在寻找一种安全快捷的方式来使用共享对象.
我在这里问过这个问题:https://github.com/krakjoe/pthreads/issues/470 但是显然这不是正确的地方.
尝试与许多其他上下文(Thread)共享一个对象(Threaded).所有线程都在更新此分片对象 - 它们可以设置自己的请求,也必须响应其他人的请求.
现在krakjoe回应说锁定/解锁在7中不可用我有问题.
我知道:.synchronized但不知道如何使用它来满足我的需求.
我如何使用:: synchronized来编写类似的方法
编辑:
我写了一个(imo)非常简单的测试脚本.
这个脚本包括没有 SYC /锁/ ...方法大气压.
它应该只显示我想要做的事情.
我仍然在寻找一种方法来使用::使这个共享安全.
码:
<?php
/*
TEST:
create n threads
each will
- Shared::set() its own ref
- check if Shared::exists() its own ref
- Shared::get() its ref back
- call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context
TODO:
using ::synchronized to handle multi-context-access
NOTES:
every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"
see: "Method Modifiers - Special Behaviour"
at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
*/
class Shared extends Threaded
{
public $data;
public function exists($ident)
{
return isset($this->data[$ident]);
}
public function set($ident, $ref)
{
$return = false;
if(!isset($this->data[$ident])){
$data = $this->data;
$data[$ident] = $ref;
$this->data = $data;
$return = $this->data[$ident];
}
#echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
return $return;
}
public function get($ident)
{
$return = false;
if($this->exists($ident) === true){
$data = $this->data;
$return = $data[$ident];
unset($data[$ident]);
$this->data = $data;
}
#echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
return $return;
}
}
class T extends Thread
{
public $count;
public function __construct(Shared $Shared, $ident)
{
$this->Shared = $Shared;
$this->ident = $ident;
}
public function run()
{
$slowdown = true;
$this->count = 0;
while(true){
if($slowdown){
// "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
// loop a bit to simulate work:
$start = microtime(true);
$until = rand(1, 100000)/1000000;
while(microtime(true)-$start < $until){
// ...
}
}
if($this->Shared->exists($this->ident) === true){
$ref = $this->Shared->get($this->ident);
}
else{
$ref = $this->Shared->set($this->ident, $this);
}
// calling a method on $ref -- if not a ref we crash
$ref->isRunning();
unset($ref);
$this->count++;
}
}
}
echo 'start ...' . PHP_EOL;
$n = 8;
$Shared = new Shared();
for($i = 0, $refs = array(); $i < $n; $i++){
$refs[$i] = new T($Shared, $i);
$refs[$i]->start();
}
while(!empty($refs)){
// print status:
if(!isset($t)or microtime(true)-$t > 1){
$t = microtime(true);
echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
}
// join crashed threads:
foreach($refs as $i => $thread){
if($thread->isRunning() === false){
echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
if($thread->isJoined() === false){
$thread->join();
}
unset($refs[$i]);
}
}
}
echo 'no thread running anymore.' . PHP_EOL;
/* output
start ...
status: 8 running atm ...
Notice: Undefined offset: 6 in ...\shared_test.php on line 33
Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-6 stopped after 10
status: 7 running atm ...
Notice: Undefined offset: 4 in ...\shared_test.php on line 33
Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-4 stopped after 35
status: 6 running atm ...
Notice: Undefined offset: 7 in ...\shared_test.php on line 33
Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-7 stopped after 43
status: 5 running atm ...
status: 5 running atm ...
status: 5 running atm ...
[...]
*/
?>
Run Code Online (Sandbox Code Playgroud)
Threaded对象已经是线程安全的,也就是说,无论何时读取,写入,检查是否存在或删除(取消设置)成员,操作都是原子的 - 没有其他上下文可以执行任何上述操作,而第一次操作发生.对于用户不知道的引擎处理程序也是如此,从最低级别开始的一切都是隐式安全的.
然而,相当重新确定......当逻辑变得更加复杂时,这有明显的限制,比如在设置或使用它做其他事情之前检查成员的存在,就像你在做的那样:虽然对象上的操作是原子的,没有什么可以阻止unset调用isset和读取属性/维度的调用之间的成员的另一个上下文.
这适用于PHP7(pthreads v3 +)
安全和诚信是两件不同的事情.当完整性很重要时,您可以Threaded::synchronized在PHP7中使用它来正确保存它.在PHP5中你也可以保留它,但代码会更复杂,解释也是如此.
如果我理解它的逻辑,你的第二个例子应该无限期地运行.所以我正在使用这个假设来构造正确的代码,我将进一步假设你可能想要在这个无限循环中做什么,并提供一些看似需要的洞察力.
<?php
class Referee extends Threaded {
public function find(string $ident, Threaded $reference) {
return $this->synchronized(function () use($ident, $reference) {
if (isset($this[$ident])) {
return $this[$ident];
} else return ($this[$ident] = $reference);
});
}
public function foreach(Closure $closure) {
$this->synchronized(function() use($closure) {
foreach ($this as $ident => $reference) {
$closure($ident, $reference);
}
});
}
}
class Test extends Thread {
public function __construct(Referee $referee, string $ident, bool $delay) {
$this->referee = $referee;
$this->ident = $ident;
$this->delay = $delay;
}
public function run() {
while (1) {
if ($this->delay) {
$this->synchronized(function(){
$this->wait(1000000);
});
}
$reference =
$this->referee->find($this->ident, $this);
/* do something with reference here, I guess */
/* do something with all references here */
$this->referee->foreach(function($ident, $reference){
var_dump(Thread::getCurrentThreadId(),
$reference->getIdent(),
$reference->isRunning());
});
}
}
public function getIdent() {
return $this->ident;
}
private $referee;
private $ident;
private $delay;
}
$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
"smelly",
"dopey",
"bashful",
"grumpy",
"sneezy",
"sleepy",
"happy",
"naughty"
];
while ($thread < 8) {
$threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
$threads[$thread]->start();
$thread++;
}
foreach ($threads as $thread)
$thread->join();
?>
Run Code Online (Sandbox Code Playgroud)
因此,我们将看看差异,我会告诉你为什么他们是这样,你怎么写他们,你已经知道我们现在不是在谈论安全,而是诚信,你得到了(相当的)非凡的假设,你写的任何东西都是"安全的",正如所解释的那样.
第一个主要区别是:
if ($this->delay) {
$this->synchronized(function(){
$this->wait(1000000);
});
}
Run Code Online (Sandbox Code Playgroud)
这只是一种Thread等待的合适方式,您不必使用Thread自身进行同步,您可以使用任何Threaded对象.正确处理的好处是,如果不清楚,睡眠和睡眠不会使线程处于接受状态,使用::wait确实.
在现实世界,在这里你真的应该永远只等待的东西,这将是一个更复杂的块,它可能(也应该)看起来更像:
if ($this->delay) {
$this->synchronized(function(){
while ($this->condition) {
$this->wait(1000000);
}
});
}
Run Code Online (Sandbox Code Playgroud)
注意:等待超时在技术上等待某些事情,但是,除了已达到超时之外,您可能会被唤醒,并且应该为此准备代码.
这样,另一个上下文能够通知Thread它应该立即停止等待和关闭,或者立即执行一些其他重要操作,只需同步,更改条件并通知Thread.
对于可预测的代码,了解同步,等待和通知工作的方式非常重要.
接下来我们有设置和/或获取参考的逻辑:
$reference =
$this->referee->find($this->ident, $this);
Run Code Online (Sandbox Code Playgroud)
哪个调用:
public function find(string $ident, Threaded $reference) {
return $this->synchronized(function () use($ident, $reference) {
if (isset($this[$ident])) {
return $this[$ident];
} else return ($this[$ident] = $reference);
});
}
Run Code Online (Sandbox Code Playgroud)
这个命名很糟糕,命名很难,但是您可以看到在执行这些分组操作时,同步会保留完整性.也可以使用相同的方法来获取对另一个对象的引用,并进行一些调整.
我猜你用那个特定的引用做了一些事情($this当前总是这样).我无法猜出是什么.继续 ...
我假设您要对这些中的每一个做一些事情Threads,并且您希望在整个迭代发生时保持数据的完整性:
$this->referee->foreach(function($ident, $reference){
var_dump(Thread::getCurrentThreadId(),
$reference->getIdent(),
$reference->isRunning());
});
Run Code Online (Sandbox Code Playgroud)
哪个调用:
public function foreach(Closure $closure) {
$this->synchronized(function() use($closure) {
foreach ($this as $ident => $reference) {
$closure($ident, $reference);
}
});
}
Run Code Online (Sandbox Code Playgroud)
这就是你如何做这样的事情.
值得一提的是,这里不一定需要同步; 就像从正在迭代的数组中删除一个成员一样,如果在迭代发生时取消设置或设置或对对象执行任何其他操作,则不会发生任何不良情况.
| 归档时间: |
|
| 查看次数: |
2080 次 |
| 最近记录: |