Kev*_*res 7 rabbitmq symfony symfony-messenger
我正在尝试读取未通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。似乎 Messenger 添加了一些标题,例如
headers:
type: App\Message\Transaction
Run Code Online (Sandbox Code Playgroud)
但是在读取外部消息时,此标头不存在。
那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型Transaction?
我今天所拥有的是:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: messages
type: direct
queue:
name: queue_messages
routing:
# Route your messages to the transports
'App\Message\Transaction': amqp
Run Code Online (Sandbox Code Playgroud)
我想补充的是:
routing:
# Route your messages to the transports
amqp: 'App\Message\Transaction'
Run Code Online (Sandbox Code Playgroud)
Ryan Weaver 在 symfony 的 slack 上回答了类似的问题:
如果消息不是来自 Messenger,您将需要一个自定义的 Messenger 序列化程序:)
1) 创建一个自定义序列化(从 Messenger 实现 SerializerInterface)并在 Messenger 配置下配置它
2)不知何故,在该序列化器中,您获取 JSON 并将其转换为代码中的某些“消息”对象。如何做到这一点取决于您 - 您需要以某种方式能够查看 JSON 并找出它应该映射到哪个消息类。然后,您可以手动创建该对象并填充数据,或使用 Symfony 的序列化程序。退回前将其包裹在信封中
3) 因为您的序列化程序现在返回一个“消息”对象(如果某种类型),所以 Messenger 使用其正常逻辑来查找该消息的处理程序并执行它们
我根据自己的需求进行了快速实现,由您来适应您的业务逻辑:
1 - 创建一个Serializerwich 实现SerializerInterface:
// I keeped the default serializer, and just override his decode method.
/**
* {@inheritdoc}
*/
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
}
if (empty($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
}
// Call a factory to return the Message Class associate with the action
if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
}
// ... keep the default Serializer logic
return new Envelope($message, ...$stamps);
}
Run Code Online (Sandbox Code Playgroud)
Message2 -使用工厂检索权限:
class MessageFactory
{
/**
* @param string $action
* @return string|null
*/
public function getMessageClass(string $action)
{
switch($action){
case ActionConstants::POST_MESSAGE :
return PostMessage::class ;
default:
return null;
}
}
}
Run Code Online (Sandbox Code Playgroud)
3) 为 Messenger 配置新的自定义序列化器:
framework:
messenger:
serializer: 'app.my_custom_serializer'
Run Code Online (Sandbox Code Playgroud)
我会尝试更进一步,找到一种直接“连接”队列的方法,会让您知道。
| 归档时间: |
|
| 查看次数: |
1810 次 |
| 最近记录: |