Laravel:同步队列系统

Rub*_*ert 5 php message-queue beanstalkd redis laravel-5

我正在尝试建立一个API,该API使用另一台服务器上的队列系统来处理请求。让我开始尝试在没有队列系统的情况下要完成的工作(无权使其保持简单):例如,使用Postman对URL https://example.com/products进行GET请求,将返回JSON字符串,例如

[
    {
        "id": 1,
        "name": "some name",
        ...
    },
    {
        "id": 2,
        "name": "some other name",
        ...
    }.
    ...
]
Run Code Online (Sandbox Code Playgroud)

route / api.php中的代码将类似于:

<php

Route::get('/products', ProductController@index');
Run Code Online (Sandbox Code Playgroud)

以及app / Http / Controllers / ProductController.php中的代码:

<?php

namespace App\Http\Controllers;

class ProductController extends Controller
{
    /**
     * Return the products.
     *
     * @return \Illuminate\Http\Response
     */
    public function index()
    {
        // Logic to get the products.
        return $products->toJson();
    }
}
Run Code Online (Sandbox Code Playgroud)

我要完成的工作是,所有业务逻辑都在另一台运行多个工作程序的服务器上进行处理。以下是我对此的推理。

  • 安全性:万一我们被黑,它很可能是客户端服务器,而不是辅助服务器。由于最后一个具有所有业务逻辑,因此在最坏的情况下,黑客将只能获取传入和传出的请求数据。
  • 多个工作人员:获得产品可能不会花费很长时间,但是可能还有其他要求需要更多时间来处理。在大多数情况下,发出请求的用户将必须等待结果。但是,其他拨打电话的用户不必等待。因此,另一名工人可以接受此请求并处理工作。

这就是我所看到的工作流程:

  • 所有的免费工人都在不停地轮询队列中的工作

    1. 用户提出要求
    2. 客户端服务器接收请求数据并将其放在队列中
    3. 工人从队列中拿出一份工作并处理
    4. 工作程序将结果返回到客户端服务器
    5. 客户端服务器将结果返回给用户

下面是一幅小图,用于清除一切。

  User 1
      _   \
     |     \
       \    \   1.
        \    \  request
         \    \                                  -------------
  result  \    \                                /             \
  5.       \    \                               |  Worker     |
            \    _|                             |  Server     |
             \                                  |  ---------  |
                   -------------                | /         \ |
                  /             \               | | Worker1 | |
                  |  Client     |            /  | |         | |  \
                  |  Server  2. |           /   | \         / |   \
                  |  ---------  |          /    |  ---------  |    \
                  | /         \ |         /     |  ---------  |     \
                  | | Queue   | |        /      | /         \ |      \     ---------
                  | |         | |      |_       | | Worker2 | |       _|  /         \
                  | | Job A   | |               | |         | |           | DB      |
                  | | Job B   | |   3.  <-----  | \         / |  ----->   | Server  |
                  | |         | |       _       |  ---------  |       _   |         |
                  | \         / |      |        |  ...        |        |  \         /
                  |  ---------  |        \      |  ---------  |      /     ---------
                  \             /         \     | /         \ |     /      ---------
                   -------------           \    | | WorkerN | |    /      /         \
              _               4. ?          \   | |         | |   /       | Other   |
               |                                | \         / |           | Servers |
             /    /                             |  ---------  |           |         |
  1.        /    /                              \             /           \         /
  request  /    /                                -------------             ---------
          /    /
         /    /  result
        /    /   5.
       /    /
          |_
   User 2
Run Code Online (Sandbox Code Playgroud)

在Laravel的文档中,我遇到了很多队列,我认为这很容易实现。我开始尝试使用Beanstalkd,但我假设任何队列驱动程序都可以。我偶然发现的问题是队列系统异步工作。结果,客户端服务器仅进行操作而无需等待结果。除非我缺少任何东西,否则似乎没有办法使队列系统同步工作。

当进一步研究Laravel文档时,我遇到了广播。我不确定我是否理解广播100%的概念,但是据我了解,接收似乎发生在Javascript中。我是一名后端开发人员,并希望避免使用Javascript。由于某种原因,我在这里使用javascript感觉不对劲,但是我不确定这种感觉是否合理。

在文档中进一步查找时,我发现了Redis。我主要对Pub / Sub功能感兴趣。我以为客户端服务器可以生成一个唯一值,将其与请求数据一起发送到队列并进行订阅。工作者完成后,可以使用此唯一值发布结果。我当时以为这可以覆盖步骤4的缺失部分。如果这种逻辑首先起作用,我仍然不确定如何在代码中起作用。我主要停留在客户端应侦听并从Redis接收数据的部分。

我可能会错过一些非常简单的东西。知道我对使用PHP进行编程以及通过万维网进行编程的概念还比较陌生。因此,如果您发现逻辑有缺陷,或者逻辑太牵强,请给我一些其他/更好方法的指导。

我听说过Gearman,这似乎可以同步和异步地进行。但是,我想保持清楚,因为我的目的是充分利用Laravel提供的工具。我仍在学习,没有足够的信心使用太多的外部插件。

编辑:这是我走了多远。我还缺少什么?还是我要问的(附近)是不可能的?

用户通话 http://my.domain.com/worker?message=whoop

用户应收到JSON响应

{"message":"you said whoop"}
Run Code Online (Sandbox Code Playgroud)

需要注意的是在响应报头中的内容类型应该是"application/json""text/html; charset=UTF-8"

这是我到目前为止所拥有的:

两个服务器API服务器和WORKER服务器。API服务器接收请求并将其推送到队列(本地Redis)。WORKER服务器上的工作人员在API服务器上处理作业。一旦工人处理了一项工作,该工作的结果就会广播到API服务器。API服务器监听广播的响应并将其发送给用户。这是通过Redis和socket.io完成的。我的问题是,在这一点上,为了发送结果,我发送了一个带有一些Javascript的刀片文件来监听响应。这将导致内容类型为“ text / html; charset = UTF-8”,并在广播工作者的结果后对其进行更新。有没有办法而不是返回视图,而是返回“ wait”直到广播结果?

API服务器:routes \ web.php:

<?php

Route::get('/worker', 'ApiController@workerHello');
Run Code Online (Sandbox Code Playgroud)

API服务器:app \ Http \ Controllers \ ApiController.php

<?php

namespace App\Http\Controllers;

use App\Jobs\WorkerHello;
use Illuminate\Http\Request;

class ApiController extends Controller
{
    /**
     * Dispatch on queue and open the response page
     *
     * @return string
     */
    public function workerHello(Request $request)
    {
        // Dispatch the request to the queue
        $jobId = $this->dispatch(new WorkerHello($request->message));

        return view('response', compact('jobId'));
    }
}
Run Code Online (Sandbox Code Playgroud)

API服务器:app \ Jobs \ WorkerHello.php

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Support\Facades\Log;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class WorkerHello implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $message;

    /**
     * Create a new job instance.
     *
     * @param  string  $message  the message
     * @return void
     */
    public function __construct($message = null)
    {
        $this->message = $message;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {

    }
}
Run Code Online (Sandbox Code Playgroud)

工作者服务器:app \ Jobs \ WorkerHello.php

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use App\Events\WorkerResponse;
use Illuminate\Support\Facades\Log;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class WorkerHello implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $message;

    /**
     * Create a new job instance.
     *
     * @param  string  $message  the message
     * @return void
     */
    public function __construct($message = null)
    {
        $this->message = $message;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // Do stuff
        $message = json_encode(['message' => 'you said ' . $this->message]);

        event(new WorkerResponse($this->job->getJobId(), $message));
    }
}
Run Code Online (Sandbox Code Playgroud)

工作者服务器:app \ Events \ WorkerResponse.php

<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class WorkerResponse implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    protected $jobId;
    public $message;

    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct($jobId, $message)
    {
        $this->jobId = $jobId;
        $this->message = $message;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return Channel|array
     */
    public function broadcastOn()
    {
        return new Channel('worker-response.' . $this->jobId);
    }
}
Run Code Online (Sandbox Code Playgroud)

API服务器:socket.js(与节点一起运行)

var server = require('http').Server();

var io = require('socket.io')(server);

var Redis = require('ioredis');
var redis = new Redis();

redis.psubscribe('worker-response.*');

redis.on('pmessage', function(pattern, channel, message) {
    message = JSON.parse(message);
    io.emit(channel + ':' + message.event, channel, message.data); 
});

server.listen(3000);
Run Code Online (Sandbox Code Playgroud)

API服务器:resources \ views \ response.blade.php

<!doctype html>
<html lang="{{ config('app.locale') }}">
    <head>
    </head>
    <body>
        <div id="app">
            <p>@{{ message }}</p>
        </div>

        <script src="https://cdnjs.cloudflare.com/ajax/libs/vue/2.3.4/vue.min.js"></script>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.3/socket.io.js"></script>

        <script>
            var socket = io('http://192.168.10.10:3000');

            new Vue({
                el: '#app',

                data: {
                    message: '',
                },

                mounted: function() {
                    socket.on('worker-response.{{ $jobId }}:App\\Events\\WorkerResponse', function (channel, data) {
                        this.message = data.message;
                    }.bind(this));
                }
            });
        </script>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)