Hob*_*bes 8 php multithreading pthreads websocket ratchet
我正在制作一个html5游戏www.titansoftime.com
我使用棘轮作为PHP websocket服务器解决方案.它很棒!http://socketo.me/docs/push
我已经使用php pthreads扩展进行了几次独立测试,并且看到了一些非常令人兴奋的结果.它确实有效并且运行良好..只要不包含websockets.
Pthreads提供PHP多线程功能(它确实有效,而且非常棒).http://php.net/manual/en/book.pthreads.php
这就是我做的:
/src/server.php这是启动守护程序的文件.
<?php
session_start();
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use MyApp\Pusher;
require __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../mysql.cls.php';
require_once __DIR__ . '/../game.cls.php';
require_once __DIR__ . '/../model.cls.php';
$mysql = new mysql;
$game = new game;
$loop = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher();
$loop->addPeriodicTimer(0.50, function() use($pusher){
$pusher->load();
});
$webSock = new React\Socket\Server($loop);
if ($loop instanceof \React\EventLoop\LibEventLoop) {
echo "\n HAS LibEvent";
}
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer($pusher)
),
$webSock
);
$loop->run();
Run Code Online (Sandbox Code Playgroud)
一切正常.
/src/MyApp/Pusher.php此类将数据推送到所有连接的用户.
<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
class AsyncThread extends \Thread{
public $client;
public function __construct($client){
$this->client = $client;
}
public function run(){
// do work on $this->client
$user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"');
// etc..
$this->client->send(json_encode(array('foo'=>'bar')));
}
}
class Pusher implements MessageComponentInterface{
public static $clients = array();
#load
public static function load(){
$client_count = count(self::$clients);
echo "\n\n\n".'Serving to '.$client_count.' clients. '.time();
$start = $istart = microtime(true);
if( !count(self::$clients) ){
if( !mysql_ping() ){
$game->connect();
}
}
$threads = array();
foreach( self::$clients as $key => $client ){
// HANDLE CLIENT
// This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable
$client->send(json_encode('foo'=>'bar'));
// So I tried this:
$threads[$key] = new AsyncThread($client);
$threads[$key]->start();
// At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure.
// If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data.
// Also regardless of whether or not I bind the data in the AsyncThread constructor,
// the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior.
}
}
public function onMessage(ConnectionInterface $from, $msg) {
global $game;
if( $msg ){
$data = json_decode($msg);
if( $data ){
switch( $data->task ){
#connect
case 'connect':
echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress;
self::$clients[] = $from;
break;
default:
self::closeConnection($from);
echo "\nNO TASK CLOSING";
break;
}
}else{
echo "\n NO DATA";
self::closeConnection($from);
}
}else{
echo "\n NO MSG";
self::closeConnection($from);
}
}
public function closeConnection($conn){
global $game;
if( $conn ){
if( $conn->resourceId ){
$connid = $conn->resourceId;
$conn->close();
$new = array();
foreach( self::$clients as $client ){
if( $client->resourceId != $connid ){
$new[] = $client;
}
}
self::$clients = $new;
$game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1');
echo "\n".'Connection '.$connid.' has disconnected';
}
}
}
public function onClose(ConnectionInterface $conn) {
echo "\nCLIENT DROPPED";
self::closeConnection($conn);
}
public function onOpen(ConnectionInterface $conn) {
}
public function onError(ConnectionInterface $conn, \Exception $e) {
echo "\nCLIENT ERRORED";
self::closeConnection($conn);
}
public function onSubscribe(ConnectionInterface $conn, $topic) {
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
}
public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
}
}
Run Code Online (Sandbox Code Playgroud)
只要我不在事件循环中创建一个线程,这一切都可以正常工作.
我是以错误的方式解决这个问题还是php多线程和websockets不兼容?
检查这个包https://github.com/huyanping/react-multi-process
安装
作曲家需要 jenner/react-multi-process 如何使用它?
就这么简单:
$loop = React\EventLoop\Factory::create();
$server = stream_socket_server('tcp://127.0.0.1:4020');
stream_set_blocking($server, 0);
$loop->addReadStream($server, function ($server) use ($loop) {
$conn = stream_socket_accept($server);
$data = "pid:" . getmypid() . PHP_EOL;
$loop->addWriteStream($conn, function ($conn) use (&$data, $loop) {
$written = fwrite($conn, $data);
if ($written === strlen($data)) {
fclose($conn);
$loop->removeStream($conn);
} else {
$data = substr($data, 0, $written);
}
});
});
// the second param is the sub process count
$master = new \React\Multi\Master($loop, 20);
$master->start();
Run Code Online (Sandbox Code Playgroud)
使用 jenner/simple_fork 的示例如下:
class IoServer {
/**
* @param int $count worker process count
* Run the application by entering the event loop
* @throws \RuntimeException If a loop was not previously specified
*/
public function run($count = 1) {
if (null === $this->loop) {
throw new \RuntimeException("A React Loop was not provided during instantiation");
}
if($count <= 1){
$this->loop->run();
}else{
$loop = $this->loop;
$master = new \Jenner\SimpleFork\FixedPool(function() use($loop) {
$this->loop->run();
}, $count);
$master->start();
$master->keep(true);
// or just
// $master = new \React\Multi\Master($this->loop, $count);
// $master->start();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4320 次 |
| 最近记录: |