标签: symfony-messenger

Symfony Messenger 在处理程序中获取重试计数

我已经实现了 symfony Messenger https://symfony.com/doc/current/messenger.html。我想在handler实现的类中获取重试计数MessageHandlerInterface

Messenger 消费者命令抛出错误并根据配置重试 5 次。所以我想在处理程序中获取重试计数,以便我可以相应地添加一些逻辑。

我还尝试创建此答案中提到的中间件/sf/answers/4275511761/。但是这个中间件没有被执行。

php symfony symfony-messenger

9
推荐指数
1
解决办法
1805
查看次数

Symfony Messenger 工作人员在主管下运行时不会停止

我正面临 Symfony Messenger 组件的奇怪行为。我根据文档进行了设置,并按照此处的messenger:stop-workers说明在每次部署时发出信号。但是,我们的系统中出现了一个错误,我可以追溯到该错误是因为 Messenger 工作人员正在使用旧版本的代码。

经过更多调查,这就是我们的设置中发生的情况:

  • 一个工人正在运行,由主管管理。
  • 只是为了调试这个特殊情况,我在终端中启动了一个新的工作程序,app/console messenger:consume --env=prod -vv async看看会发生什么
  • 我发出停止命令 app/console messenger:stop-workers --env=prod
  • 我现在希望这两个工人都会被停止(并且主管会重新启动它正在处理的那个)。然而,这不会发生。“调试”工作人员确实停止了,但在主管下运行的工作人员什么也不做。

主管管理的工人被限制为 1 小时的处理时间,之后他们将被停止并重新启动。我可以看到supervisord.log这很好用。每小时都有关于进程停止和启动的日志条目。但是没有任何关于他们停止messenger:stop-workers命令的事情。

我正在寻找关于为什么会发生这种情况的想法。我阅读了工作人员的实现,关闭信号是通过缓存发送的,但我没有发现我们的配置有任何问题。

symfony symfony-messenger

8
推荐指数
1
解决办法
1454
查看次数

库错误:Symfony Messenger 中发生套接字错误

在我的 Symfony 项目中,有一个队列消息处理程序,并且在执行过程中随机出现一个错误:

[2022-10-12T07:31:40.060119+00:00] console.CRITICAL: Error thrown while running command "messenger:consume async --limit=10". Message: "Library error: a socket error occurred" {"exception":"[object] (Symfony\\Component\\Messenger\\Exception
TransportException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/AmqpReceiver.php:62)
[previous exception] [object] (AMQPException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/Connection.php:439)","command":"messenger:consume async --limit=10","message":"Library error: a socket error occurred"} []
Run Code Online (Sandbox Code Playgroud)

处理程序执行的 HTTP 请求可能会持续几秒钟,如果 API 速度很慢,单个消息的整个过程甚至可能需要一分钟以上。奇怪的是,问题消失了几个小时,但随后又随机出现。发送到队列的消息越多,就越容易看到异常。

config\packages\messenger.yaml

framework:
    messenger:    
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: async_exchange
                    queues:
                        async: ~
                    heartbeat: 45
                    write_timeout: 90
                    read_timeout: 90 …
Run Code Online (Sandbox Code Playgroud)

php amqp symfony symfony-messenger

8
推荐指数
1
解决办法
1716
查看次数

使用 Messenger 读取未通过 Messenger 发送的排队消息

我正在尝试读取未通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。似乎 Messenger 添加了一些标题,例如

headers: 
    type: App\Message\Transaction
Run Code Online (Sandbox Code Playgroud)

但是在读取外部消息时,此标头不存在。

那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型Transaction

我今天所拥有的是:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp
Run Code Online (Sandbox Code Playgroud)

我想补充的是:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'
Run Code Online (Sandbox Code Playgroud)

rabbitmq symfony symfony-messenger

7
推荐指数
1
解决办法
1810
查看次数

带有绑定键的 Symfony 信使队列 - 重试策略

我正在为我工​​作的公司实施 Messenger。我发现路由键有问题。

我想将一条消息发送到两个队列。另外两个应用程序将处理此队列。一切正常,但是当处理程序抛出异常时我发现了问题。它将消息发送一个或两个重试队列加倍,因为重试队列通过绑定键匹配,这对于这个队列是相同的。

最后通过 3 次重试,我的 dlq 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?

我的配置看起来像:

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~
Run Code Online (Sandbox Code Playgroud)

我用这样的戳发送事件: …

php amqp rabbitmq symfony symfony-messenger

7
推荐指数
1
解决办法
1442
查看次数

如何使用Symfony and Enqueue适配器的Messenger组件使用来自Cloud Pub / Sub的消息?

我使用的工具:


我的配置文件:

#service.yaml

framework:
    messenger:
        transports:
            default: 'amqp://guest:guest@localhost:5672/%2f/messages'
            enqueue: 'enqueue://gps'
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands: ~
            messenger.bus.events: ~
        routing:
            # Route your messages to the transports
            'App\Message\Command\Store\CreateStore': enqueue
Run Code Online (Sandbox Code Playgroud)

#enqueue.yaml

enqueue:
    transport:
        default: 'gps'
        gps:
            projectId: '%env(GOOGLE_PROJECT_ID)%'
            keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
Run Code Online (Sandbox Code Playgroud)

将消息/命令发送到Cloud Pub / Sub队列OK

我的消息/命令CreateStore已正确发送到Cloud Pub / Sub,如下所示:

use App\Message\Command\Store\CreateStore;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

$command = new CreateStore();
$this->commandBus->dispatch((new Envelope($command))->with(new 
TransportConfiguration(
    ['topic' => 'enqueue.commands']
)));
Run Code Online (Sandbox Code Playgroud)

所有消息/命令都在我的Cloud Pub / …

php symfony google-cloud-pubsub symfony-messenger enqueue

5
推荐指数
0
解决办法
599
查看次数

Symfony 4工作人员使用学说无法正常工作:SQLSTATE [HY000] [2002]连接超时

我正在使用带有 Symfony 4 信使组件的工作人员。

这位工人是

  • 接收消息(来自rabbitMQ)
  • 启动 ffmpeg
  • 对视频进行处理
  • 并将一些内容保存在数据库中。

为了在 Symfony 上配置这个工作器,我已经这样做了(中间件很重要):

// config/packages/framework.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # each time a message is handled, the Doctrine connection
                    # is "pinged" and reconnected if it's closed. Useful
                    # if your workers run for a long time and the database
                    # connection is sometimes lost
                    - doctrine_ping_connection

                    # After handling, the Doctrine connection is closed,
                    # which can free up database connections in a worker,
                    # instead of keeping …
Run Code Online (Sandbox Code Playgroud)

php symfony symfony-messenger

5
推荐指数
1
解决办法
4725
查看次数

如何在 Symfony Messenger 异步消息处理程序上使用投票者/权限?

我正在开发一个安装了 Symfony Messenger 组件来处理异步消息的应用程序。消息处理程序需要检查某些特定用户的某些权限,例如,如果一个确定的用户具有编辑权限,则是否应该收到一封包含信息的电子邮件。

为了实现这一点,我们使用 Symfony voters,但是当我们没有任何用户登录系统(例如控制台命令和异步消息)时,这是非常烦人的。最好的解决方案是什么?

太感谢了

symfony symfony-messenger symfony-4.4

5
推荐指数
1
解决办法
1846
查看次数

单元测试Symfony Messenger

在最近的项目中,我一直在使用Symfony Messenger实施AMQP。虽然代码让我非常高兴,但是我无法编写用于分派消息的方法的单元测试。

我所有涉及调度消息的代码的测试都产生以下警告:

Class "Symfony\Component\Messenger\Envelope" is declared "final" and cannot be mocked.
Run Code Online (Sandbox Code Playgroud)

我无法在Symfony网站上找到有关Symfony Messenger上的测试的文档。

如何使用涉及消息的单元测试来测试代码?

类:

namespace App\MessageHandler;

use App\Entity\File;
use App\Message\CheckVideoMessage;
use App\Message\GenerateThumbnailMessage;
use App\Message\GetVideoMetadataMessage;
use App\Message\ProcessFileMessage;
use App\Service\MediaProcessorService;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\MessageBusInterface;

class ProcessFileMessageHandler
{
    /**
     * @var MediaProcessorService
     */
    private $mediaProcessorService;

    /**
     * @var EntityManagerInterface
     */
    private $entityManager;

    /**
     * @var LoggerInterface
     */
    private $logger;

    /**
     * @var MessageBusInterface
     */
    private $messageBus;

    public function __construct(
        MediaProcessorService $mediaProcessorService,
        EntityManagerInterface $entityManager,
        LoggerInterface $logger,
        MessageBusInterface $messageBus …
Run Code Online (Sandbox Code Playgroud)

php phpunit amqp symfony symfony-messenger

4
推荐指数
1
解决办法
937
查看次数

Symfony MessageHandler 计算消息已发送的次数

我正在使用 Symfony Messenger,我想继续在处理程序中发送消息,直到它被发送多次。

我怎样才能跟踪它?

这是到目前为止我的处理程序类的代码:

class RetryTestHandler implements MessageHandlerInterface
{
    /**
    * @var EntityManagerInterface
    */
    private $entityManager;
    /**
     * @var MessageBusInterface
     */
    private $bus;

    public function __construct(MessageBusInterface $bus, EntityManagerInterface $entityManager)
    {
        $this->entityManager = $entityManager;
        $this->bus = $bus;
    }

    public function __invoke(RetryTest $message)
    {
        // TODO: Keep dispatching message until it has been dispatched 10 times?
        $this->bus->dispatch(new RetryTest("This is a test!"), [
            new DelayStamp(5000)
        ]);
    }
}

Run Code Online (Sandbox Code Playgroud)

php symfony symfony4 symfony-messenger

3
推荐指数
1
解决办法
3381
查看次数

尽管在 Symfony Messenger 中将处理程序路由配置为异步,但消息未异步分派

我正在使用 Symfony 4.4 和 Symfony Messenger

Messenger 配置包括传输和路由:

messenger:
    failure_transport: failed

    transports:
        async_medium:
            dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            retry_strategy:
                max_retries: 3
                delay: 1000
        failed:
            ...

    routing:
        'Name\Space\MessageHandler\SnowplowNotificationHandler': async_medium
Run Code Online (Sandbox Code Playgroud)

尽管处理程序看起来配置正确(当我运行控制台 debug:messenger 时,它显示正确分配给传输)

Messenger

messenger.bus.default

 The following messages can be dispatched:
  Name\Space\Message\SnowplowMessage                                                  
     handled by Name\Space\MessageHandler\SnowplowEmailNotificationHandler  
Run Code Online (Sandbox Code Playgroud)

消息类SnowplowMessage不会排队,而是立即发送到_invoke()处理程序的方法。

我正在使用 AMQP(RabbitMQ 作为传输)并且它配置正确,因为没有显示错误并且命令控制台信使:setup-transport 正确创建队列

php symfony symfony-messenger

3
推荐指数
1
解决办法
1672
查看次数

如何在多个 Symfony 实例(共享缓存池)之间共享应用程序缓存?

我的 symfony 应用程序有多个实例在单独的 docker 容器中运行

我已经配置我app.cache使用redis:

framework:
    cache:
        app: cache.adapter.redis
Run Code Online (Sandbox Code Playgroud)

我有同样的prefix_seed

framework:
    cache:
        prefix_seed: 'dev'
Run Code Online (Sandbox Code Playgroud)

结果,我在 redis 中遇到了这样的事情:

1605259288.470950 [0 172.18.0.28:55044] "MGET" "HnMEIyUlZ+:workers.restart_requested_timestamp"
1605259288.471680 [0 172.18.0.28:55044] "SET" "HnMEIyUlZ+:workers.restart_requested_timestamp" "d:1605259288.471522;"
1605259314.483389 [0 172.18.0.29:42884] "MGET" "8TMgMtnOAG:workers.restart_requested_timestamp"
Run Code Online (Sandbox Code Playgroud)

从上面可以看出,2 个不同的实例试图通过相同的键从 redis 中获取值,workers.restart_requested_timestamp但即使使用相同的prefix_seed.

在这个例子中,我使用了 messenger 组件,我想通过stop-workers命令(通过共享 redis)停止在任何地方运行的工人。但一般来说这与缓存配置有关。

如何克服这个问题并告诉两个应用程序使用相同的池?这是什么配置?

php symfony symfony-cache symfony4 symfony-messenger

2
推荐指数
1
解决办法
499
查看次数

EventDispatcher 和 Messenger 之间的区别

通过 Symfony 组件调度事件Messenger和通过 Symfony 组件调度事件之间的基本区别是什么EventDispatcher

rabbitmq symfony symfony-messenger

0
推荐指数
1
解决办法
1503
查看次数