我已经实现了 symfony Messenger https://symfony.com/doc/current/messenger.html。我想在handler实现的类中获取重试计数MessageHandlerInterface
Messenger 消费者命令抛出错误并根据配置重试 5 次。所以我想在处理程序中获取重试计数,以便我可以相应地添加一些逻辑。
我还尝试创建此答案中提到的中间件/sf/answers/4275511761/。但是这个中间件没有被执行。
我正面临 Symfony Messenger 组件的奇怪行为。我根据文档进行了设置,并按照此处的messenger:stop-workers说明在每次部署时发出信号。但是,我们的系统中出现了一个错误,我可以追溯到该错误是因为 Messenger 工作人员正在使用旧版本的代码。
经过更多调查,这就是我们的设置中发生的情况:
app/console messenger:consume --env=prod -vv async看看会发生什么app/console messenger:stop-workers --env=prod主管管理的工人被限制为 1 小时的处理时间,之后他们将被停止并重新启动。我可以看到supervisord.log这很好用。每小时都有关于进程停止和启动的日志条目。但是没有任何关于他们停止messenger:stop-workers命令的事情。
我正在寻找关于为什么会发生这种情况的想法。我阅读了工作人员的实现,关闭信号是通过缓存发送的,但我没有发现我们的配置有任何问题。
在我的 Symfony 项目中,有一个队列消息处理程序,并且在执行过程中随机出现一个错误:
[2022-10-12T07:31:40.060119+00:00] console.CRITICAL: Error thrown while running command "messenger:consume async --limit=10". Message: "Library error: a socket error occurred" {"exception":"[object] (Symfony\\Component\\Messenger\\Exception
TransportException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/AmqpReceiver.php:62)
[previous exception] [object] (AMQPException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/Connection.php:439)","command":"messenger:consume async --limit=10","message":"Library error: a socket error occurred"} []
Run Code Online (Sandbox Code Playgroud)
处理程序执行的 HTTP 请求可能会持续几秒钟,如果 API 速度很慢,单个消息的整个过程甚至可能需要一分钟以上。奇怪的是,问题消失了几个小时,但随后又随机出现。发送到队列的消息越多,就越容易看到异常。
config\packages\messenger.yaml
framework:
messenger:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: async_exchange
queues:
async: ~
heartbeat: 45
write_timeout: 90
read_timeout: 90 …Run Code Online (Sandbox Code Playgroud) 我正在尝试读取未通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。似乎 Messenger 添加了一些标题,例如
headers:
type: App\Message\Transaction
Run Code Online (Sandbox Code Playgroud)
但是在读取外部消息时,此标头不存在。
那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型Transaction?
我今天所拥有的是:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: messages
type: direct
queue:
name: queue_messages
routing:
# Route your messages to the transports
'App\Message\Transaction': amqp
Run Code Online (Sandbox Code Playgroud)
我想补充的是:
routing:
# Route your messages to the transports
amqp: 'App\Message\Transaction'
Run Code Online (Sandbox Code Playgroud) 我正在为我工作的公司实施 Messenger。我发现路由键有问题。
我想将一条消息发送到两个队列。另外两个应用程序将处理此队列。一切正常,但是当处理程序抛出异常时我发现了问题。它将消息发送一个或两个重试队列加倍,因为重试队列通过绑定键匹配,这对于这个队列是相同的。
最后通过 3 次重试,我的 dlq 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?
我的配置看起来像:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': async
buses:
command.bus:
middleware:
- Olimp\Shared\Application\Message\Middleware\EventDispatcher
- doctrine_close_connection
- doctrine_transaction
event.bus:
default_middleware: allow_no_handlers
query.bus: ~
Run Code Online (Sandbox Code Playgroud)
我用这样的戳发送事件: …
#service.yaml
framework:
messenger:
transports:
default: 'amqp://guest:guest@localhost:5672/%2f/messages'
enqueue: 'enqueue://gps'
default_bus: messenger.bus.commands
buses:
messenger.bus.commands: ~
messenger.bus.events: ~
routing:
# Route your messages to the transports
'App\Message\Command\Store\CreateStore': enqueue
Run Code Online (Sandbox Code Playgroud)
。
#enqueue.yaml
enqueue:
transport:
default: 'gps'
gps:
projectId: '%env(GOOGLE_PROJECT_ID)%'
keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
Run Code Online (Sandbox Code Playgroud)
我的消息/命令CreateStore已正确发送到Cloud Pub / Sub,如下所示:
use App\Message\Command\Store\CreateStore;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
$command = new CreateStore();
$this->commandBus->dispatch((new Envelope($command))->with(new
TransportConfiguration(
['topic' => 'enqueue.commands']
)));
Run Code Online (Sandbox Code Playgroud)
所有消息/命令都在我的Cloud Pub / …
我正在使用带有 Symfony 4 信使组件的工作人员。
这位工人是
为了在 Symfony 上配置这个工作器,我已经这样做了(中间件很重要):
// config/packages/framework.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# each time a message is handled, the Doctrine connection
# is "pinged" and reconnected if it's closed. Useful
# if your workers run for a long time and the database
# connection is sometimes lost
- doctrine_ping_connection
# After handling, the Doctrine connection is closed,
# which can free up database connections in a worker,
# instead of keeping …Run Code Online (Sandbox Code Playgroud) 我正在开发一个安装了 Symfony Messenger 组件来处理异步消息的应用程序。消息处理程序需要检查某些特定用户的某些权限,例如,如果一个确定的用户具有编辑权限,则是否应该收到一封包含信息的电子邮件。
为了实现这一点,我们使用 Symfony voters,但是当我们没有任何用户登录系统(例如控制台命令和异步消息)时,这是非常烦人的。最好的解决方案是什么?
太感谢了
在最近的项目中,我一直在使用Symfony Messenger实施AMQP。虽然代码让我非常高兴,但是我无法编写用于分派消息的方法的单元测试。
我所有涉及调度消息的代码的测试都产生以下警告:
Class "Symfony\Component\Messenger\Envelope" is declared "final" and cannot be mocked.
Run Code Online (Sandbox Code Playgroud)
我无法在Symfony网站上找到有关Symfony Messenger上的测试的文档。
如何使用涉及消息的单元测试来测试代码?
类:
namespace App\MessageHandler;
use App\Entity\File;
use App\Message\CheckVideoMessage;
use App\Message\GenerateThumbnailMessage;
use App\Message\GetVideoMetadataMessage;
use App\Message\ProcessFileMessage;
use App\Service\MediaProcessorService;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\MessageBusInterface;
class ProcessFileMessageHandler
{
/**
* @var MediaProcessorService
*/
private $mediaProcessorService;
/**
* @var EntityManagerInterface
*/
private $entityManager;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var MessageBusInterface
*/
private $messageBus;
public function __construct(
MediaProcessorService $mediaProcessorService,
EntityManagerInterface $entityManager,
LoggerInterface $logger,
MessageBusInterface $messageBus …Run Code Online (Sandbox Code Playgroud) 我正在使用 Symfony Messenger,我想继续在处理程序中发送消息,直到它被发送多次。
我怎样才能跟踪它?
这是到目前为止我的处理程序类的代码:
class RetryTestHandler implements MessageHandlerInterface
{
/**
* @var EntityManagerInterface
*/
private $entityManager;
/**
* @var MessageBusInterface
*/
private $bus;
public function __construct(MessageBusInterface $bus, EntityManagerInterface $entityManager)
{
$this->entityManager = $entityManager;
$this->bus = $bus;
}
public function __invoke(RetryTest $message)
{
// TODO: Keep dispatching message until it has been dispatched 10 times?
$this->bus->dispatch(new RetryTest("This is a test!"), [
new DelayStamp(5000)
]);
}
}
Run Code Online (Sandbox Code Playgroud) 我正在使用 Symfony 4.4 和 Symfony Messenger
Messenger 配置包括传输和路由:
messenger:
failure_transport: failed
transports:
async_medium:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
delay: 1000
failed:
...
routing:
'Name\Space\MessageHandler\SnowplowNotificationHandler': async_medium
Run Code Online (Sandbox Code Playgroud)
尽管处理程序看起来配置正确(当我运行控制台 debug:messenger 时,它显示正确分配给传输)
Messenger
messenger.bus.default
The following messages can be dispatched:
Name\Space\Message\SnowplowMessage
handled by Name\Space\MessageHandler\SnowplowEmailNotificationHandler
Run Code Online (Sandbox Code Playgroud)
消息类SnowplowMessage不会排队,而是立即发送到_invoke()处理程序的方法。
我正在使用 AMQP(RabbitMQ 作为传输)并且它配置正确,因为没有显示错误并且命令控制台信使:setup-transport 正确创建队列
我的 symfony 应用程序有多个实例在单独的 docker 容器中运行。
我已经配置我app.cache使用redis:
framework:
cache:
app: cache.adapter.redis
Run Code Online (Sandbox Code Playgroud)
我有同样的prefix_seed:
framework:
cache:
prefix_seed: 'dev'
Run Code Online (Sandbox Code Playgroud)
结果,我在 redis 中遇到了这样的事情:
1605259288.470950 [0 172.18.0.28:55044] "MGET" "HnMEIyUlZ+:workers.restart_requested_timestamp"
1605259288.471680 [0 172.18.0.28:55044] "SET" "HnMEIyUlZ+:workers.restart_requested_timestamp" "d:1605259288.471522;"
1605259314.483389 [0 172.18.0.29:42884] "MGET" "8TMgMtnOAG:workers.restart_requested_timestamp"
Run Code Online (Sandbox Code Playgroud)
从上面可以看出,2 个不同的实例试图通过相同的键从 redis 中获取值,workers.restart_requested_timestamp但即使使用相同的prefix_seed.
在这个例子中,我使用了 messenger 组件,我想通过stop-workers命令(通过共享 redis)停止在任何地方运行的工人。但一般来说这与缓存配置有关。
如何克服这个问题并告诉两个应用程序使用相同的池?这是什么配置?
通过 Symfony 组件调度事件Messenger和通过 Symfony 组件调度事件之间的基本区别是什么EventDispatcher?