Rub*_*ert 5 php message-queue beanstalkd redis laravel-5
我正在尝试建立一个API,该API使用另一台服务器上的队列系统来处理请求。让我开始尝试在没有队列系统的情况下要完成的工作(无权使其保持简单):例如,使用Postman对URL https://example.com/products进行GET请求,将返回JSON字符串,例如
[
{
"id": 1,
"name": "some name",
...
},
{
"id": 2,
"name": "some other name",
...
}.
...
]
Run Code Online (Sandbox Code Playgroud)
route / api.php中的代码将类似于:
<php
Route::get('/products', ProductController@index');
Run Code Online (Sandbox Code Playgroud)
以及app / Http / Controllers / ProductController.php中的代码:
<?php
namespace App\Http\Controllers;
class ProductController extends Controller
{
/**
* Return the products.
*
* @return \Illuminate\Http\Response
*/
public function index()
{
// Logic to get the products.
return $products->toJson();
}
}
Run Code Online (Sandbox Code Playgroud)
我要完成的工作是,所有业务逻辑都在另一台运行多个工作程序的服务器上进行处理。以下是我对此的推理。
这就是我所看到的工作流程:
所有的免费工人都在不停地轮询队列中的工作
下面是一幅小图,用于清除一切。
User 1
_ \
| \
\ \ 1.
\ \ request
\ \ -------------
result \ \ / \
5. \ \ | Worker |
\ _| | Server |
\ | --------- |
------------- | / \ |
/ \ | | Worker1 | |
| Client | / | | | | \
| Server 2. | / | \ / | \
| --------- | / | --------- | \
| / \ | / | --------- | \
| | Queue | | / | / \ | \ ---------
| | | | |_ | | Worker2 | | _| / \
| | Job A | | | | | | | DB |
| | Job B | | 3. <----- | \ / | -----> | Server |
| | | | _ | --------- | _ | |
| \ / | | | ... | | \ /
| --------- | \ | --------- | / ---------
\ / \ | / \ | / ---------
------------- \ | | WorkerN | | / / \
_ 4. ? \ | | | | / | Other |
| | \ / | | Servers |
/ / | --------- | | |
1. / / \ / \ /
request / / ------------- ---------
/ /
/ / result
/ / 5.
/ /
|_
User 2
Run Code Online (Sandbox Code Playgroud)
在Laravel的文档中,我遇到了很多队列,我认为这很容易实现。我开始尝试使用Beanstalkd,但我假设任何队列驱动程序都可以。我偶然发现的问题是队列系统异步工作。结果,客户端服务器仅进行操作而无需等待结果。除非我缺少任何东西,否则似乎没有办法使队列系统同步工作。
当进一步研究Laravel文档时,我遇到了广播。我不确定我是否理解广播100%的概念,但是据我了解,接收似乎发生在Javascript中。我是一名后端开发人员,并希望避免使用Javascript。由于某种原因,我在这里使用javascript感觉不对劲,但是我不确定这种感觉是否合理。
在文档中进一步查找时,我发现了Redis。我主要对Pub / Sub功能感兴趣。我以为客户端服务器可以生成一个唯一值,将其与请求数据一起发送到队列并进行订阅。工作者完成后,可以使用此唯一值发布结果。我当时以为这可以覆盖步骤4的缺失部分。如果这种逻辑首先起作用,我仍然不确定如何在代码中起作用。我主要停留在客户端应侦听并从Redis接收数据的部分。
我可能会错过一些非常简单的东西。知道我对使用PHP进行编程以及通过万维网进行编程的概念还比较陌生。因此,如果您发现逻辑有缺陷,或者逻辑太牵强,请给我一些其他/更好方法的指导。
我听说过Gearman,这似乎可以同步和异步地进行。但是,我想保持清楚,因为我的目的是充分利用Laravel提供的工具。我仍在学习,没有足够的信心使用太多的外部插件。
编辑:这是我走了多远。我还缺少什么?还是我要问的(附近)是不可能的?
用户通话 http://my.domain.com/worker?message=whoop
用户应收到JSON响应
{"message":"you said whoop"}
Run Code Online (Sandbox Code Playgroud)
需要注意的是在响应报头中的内容类型应该是"application/json"不"text/html; charset=UTF-8"
这是我到目前为止所拥有的:
两个服务器API服务器和WORKER服务器。API服务器接收请求并将其推送到队列(本地Redis)。WORKER服务器上的工作人员在API服务器上处理作业。一旦工人处理了一项工作,该工作的结果就会广播到API服务器。API服务器监听广播的响应并将其发送给用户。这是通过Redis和socket.io完成的。我的问题是,在这一点上,为了发送结果,我发送了一个带有一些Javascript的刀片文件来监听响应。这将导致内容类型为“ text / html; charset = UTF-8”,并在广播工作者的结果后对其进行更新。有没有办法而不是返回视图,而是返回“ wait”直到广播结果?
API服务器:routes \ web.php:
<?php
Route::get('/worker', 'ApiController@workerHello');
Run Code Online (Sandbox Code Playgroud)
API服务器:app \ Http \ Controllers \ ApiController.php
<?php
namespace App\Http\Controllers;
use App\Jobs\WorkerHello;
use Illuminate\Http\Request;
class ApiController extends Controller
{
/**
* Dispatch on queue and open the response page
*
* @return string
*/
public function workerHello(Request $request)
{
// Dispatch the request to the queue
$jobId = $this->dispatch(new WorkerHello($request->message));
return view('response', compact('jobId'));
}
}
Run Code Online (Sandbox Code Playgroud)
API服务器:app \ Jobs \ WorkerHello.php
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Support\Facades\Log;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class WorkerHello implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $message;
/**
* Create a new job instance.
*
* @param string $message the message
* @return void
*/
public function __construct($message = null)
{
$this->message = $message;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
}
}
Run Code Online (Sandbox Code Playgroud)
工作者服务器:app \ Jobs \ WorkerHello.php
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use App\Events\WorkerResponse;
use Illuminate\Support\Facades\Log;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class WorkerHello implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $message;
/**
* Create a new job instance.
*
* @param string $message the message
* @return void
*/
public function __construct($message = null)
{
$this->message = $message;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
// Do stuff
$message = json_encode(['message' => 'you said ' . $this->message]);
event(new WorkerResponse($this->job->getJobId(), $message));
}
}
Run Code Online (Sandbox Code Playgroud)
工作者服务器:app \ Events \ WorkerResponse.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class WorkerResponse implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
protected $jobId;
public $message;
/**
* Create a new event instance.
*
* @return void
*/
public function __construct($jobId, $message)
{
$this->jobId = $jobId;
$this->message = $message;
}
/**
* Get the channels the event should broadcast on.
*
* @return Channel|array
*/
public function broadcastOn()
{
return new Channel('worker-response.' . $this->jobId);
}
}
Run Code Online (Sandbox Code Playgroud)
API服务器:socket.js(与节点一起运行)
var server = require('http').Server();
var io = require('socket.io')(server);
var Redis = require('ioredis');
var redis = new Redis();
redis.psubscribe('worker-response.*');
redis.on('pmessage', function(pattern, channel, message) {
message = JSON.parse(message);
io.emit(channel + ':' + message.event, channel, message.data);
});
server.listen(3000);
Run Code Online (Sandbox Code Playgroud)
API服务器:resources \ views \ response.blade.php
<!doctype html>
<html lang="{{ config('app.locale') }}">
<head>
</head>
<body>
<div id="app">
<p>@{{ message }}</p>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/vue/2.3.4/vue.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.3/socket.io.js"></script>
<script>
var socket = io('http://192.168.10.10:3000');
new Vue({
el: '#app',
data: {
message: '',
},
mounted: function() {
socket.on('worker-response.{{ $jobId }}:App\\Events\\WorkerResponse', function (channel, data) {
this.message = data.message;
}.bind(this));
}
});
</script>
</body>
</html>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
700 次 |
| 最近记录: |