如何在Laravel队列中容纳Amazon FIFO SQS?

Fre*_*ang 10 php amazon-sqs amazon-web-services laravel

亚马逊已经宣布他们新的FIFO SQS服务,我想在Laravel Queue中使用它来解决一些并发问题.

我创建了几个新队列并更改了配置.但是,我收到了一个MissingParameter错误

The request must contain the parameter MessageGroupId.
Run Code Online (Sandbox Code Playgroud)

所以我修改了文件 vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = [])
{
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);

    return $response->get('MessageId');
}

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
    ])->get('MessageId');
}
Run Code Online (Sandbox Code Playgroud)

我正在使用它APP_ENV作为组ID(它是一个单一的消息队列,所以实际上它并不重要.我只想要一切都是FIFO).

但我仍然收到相同的错误消息.我该怎么办呢?任何帮助,将不胜感激.

(顺便说一句,SDK定义在哪里sendMessage?我可以找到它的存根但我没有找到详细的实现)

Unn*_*wut 18

我想指出那些可能偶然遇到同样问题的人,虽然编辑SqsQueue.php工作,但很容易被一个composer install或多个重置composer update.另一种方法是Illuminate\Queue\Connectors\ConnectorInterface为SQS FIFO 实现一个新的,然后将其添加到Laravel的队列管理器中.

我的方法如下:

  1. 创建一个SqsFifoQueue扩展Illuminate\Queue\SqsQueue但支持SQS FIFO 的新类.
  2. 创建一个SqsFifoConnector扩展的新类,Illuminate\Queue\Connectors\SqsConnector用于建立连接SqsFifoQueue.
  3. 创建一个新的SqsFifoServiceProvider,注册SqsFifoConnector到Laravel的队列管理器.
  4. 加入SqsFifoServiceProvider你的config/app.php.
  5. 更新config/queue.php以使用新的SQS FIFO队列驱动程序.

例:

  1. 创建一个SqsFifoQueue扩展Illuminate\Queue\SqsQueue但支持SQS FIFO 的新类.

    <?php
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
    {
        public function pushRaw($payload, $queue = null, array $options = [])
        {
            $response = $this->sqs->sendMessage([
                'QueueUrl' => $this->getQueue($queue),
                'MessageBody' => $payload,
                'MessageGroupId' => uniqid(),
                'MessageDeduplicationId' => uniqid(),
            ]);
    
            return $response->get('MessageId');
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  2. 创建一个SqsFifoConnector扩展的新类,Illuminate\Queue\Connectors\SqsConnector用于建立连接SqsFifoQueue.

    <?php
    
    use Aws\Sqs\SqsClient;
    use Illuminate\Support\Arr;
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
    {
        public function connect(array $config)
        {
            $config = $this->getDefaultConfiguration($config);
    
            if ($config['key'] && $config['secret']) {
                $config['credentials'] = Arr::only($config, ['key', 'secret']);
            }
    
            return new SqsFifoQueue(
                new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
            );
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  3. 创建一个新的SqsFifoServiceProvider,注册SqsFifoConnector到Laravel的队列管理器.

    <?php
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
    {
        public function register()
        {
            $this->app->afterResolving('queue', function ($manager) {
                $manager->addConnector('sqsfifo', function () {
                    return new SqsFifoConnector;
                });
            });
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  4. 加入SqsFifoServiceProvider你的config/app.php.

    <?php
    
    return [
        'providers'     => [
            ...
            SqsFifoServiceProvider::class,
        ],
    ];
    
    Run Code Online (Sandbox Code Playgroud)
  5. 更新config/queue.php以使用新的SQS FIFO队列驱动程序.

    <?php
    
    return [
    
        'default' => 'sqsfifo',
    
        'connections' => [
            'sqsfifo' => [
                'driver' => 'sqsfifo',
                'key'    => 'my_key'
                'secret' => 'my_secret',
                'queue'  => 'my_queue_url',
                'region' => 'my_sqs_region',
            ],
        ],
    ];
    
    Run Code Online (Sandbox Code Playgroud)

然后您的队列现在应该支持SQS FIFO队列.

无耻的插件:在完成上述步骤时,我已经创建了一个laravel-sqs-fifo composer包来处理这个问题,访问https://github.com/maqe/laravel-sqs-fifo.

  • 'MessageGroupId' =&gt; uniqid(),打破了 FIFO 保证。那为什么不使用常规队列呢?https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html (4认同)