使用 Messenger 读取未通过 Messenger 发送的排队消息

Kev*_*res 7 rabbitmq symfony symfony-messenger

我正在尝试读取未通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。似乎 Messenger 添加了一些标题,例如

headers: 
    type: App\Message\Transaction
Run Code Online (Sandbox Code Playgroud)

但是在读取外部消息时,此标头不存在。

那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型Transaction

我今天所拥有的是:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp
Run Code Online (Sandbox Code Playgroud)

我想补充的是:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'
Run Code Online (Sandbox Code Playgroud)

Erw*_*uet 4

Ryan Weaver 在 symfony 的 slack 上回答了类似的问题

如果消息不是来自 Messenger,您将需要一个自定义的 Messenger 序列化程序:)

1) 创建一个自定义序列化(从 Messenger 实现 SerializerInterface)并在 Messenger 配置下配置它

2)不知何故,在该序列化器中,您获取 JSON 并将其转换为代码中的某些“消息”对象。如何做到这一点取决于您 - 您需要以某种方式能够查看 JSON 并找出它应该映射到哪个消息类。然后,您可以手动创建该对象并填充数据,或使用 Symfony 的序列化程序。退回前将其包裹在信封中

3) 因为您的序列化程序现在返回一个“消息”对象(如果某种类型),所以 Messenger 使用其正常逻辑来查找该消息的处理程序并执行它们


我根据自己的需求进行了快速实现,由您来适应您的业务逻辑

1 - 创建一个Serializerwich 实现SerializerInterface


   // I keeped the default serializer, and just override his decode method.

   /**
     * {@inheritdoc}
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
            throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
        }

        if (empty($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
        }

        // Call a factory to return the Message Class associate with the action
        if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
        }

        // ... keep the default Serializer logic

        return new Envelope($message, ...$stamps);
    }
Run Code Online (Sandbox Code Playgroud)

Message2 -使用工厂检索权限:

class MessageFactory
{
    /**
     * @param string $action
     * @return string|null
     */
    public function getMessageClass(string $action)
    {
        switch($action){
            case ActionConstants::POST_MESSAGE :
                return PostMessage::class ;
            default:
                return null;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

3) 为 Messenger 配置新的自定义序列化器:

framework:
  messenger:
    serializer: 'app.my_custom_serializer'
Run Code Online (Sandbox Code Playgroud)

我会尝试更进一步,找到一种直接“连接”队列的方法,会让您知道。