使用 Apache Kafka 作为队列传输的 Symfony Messenger

Ark*_* G. 5 php symfony messenger apache-kafka

我不知道如何配置KafkaSymfony messenger。一切正常rabbitmq(我创建了信使和信使处理程序):

. 环境:

MESSENGER_TRANSPORT_DSN=amqp://user:password@myhost:5672/%2f/messages
Run Code Online (Sandbox Code Playgroud)

配置/包/messenger.yaml

framework:
  messenger:
    transports:
      async: "%env(MESSENGER_TRANSPORT_DSN)%"
Run Code Online (Sandbox Code Playgroud)

.env

MESSENGER_TRANSPORT_DSN=enqueue://node-1.kafka.myhost.com:9092/%2f/messages
Run Code Online (Sandbox Code Playgroud)

配置/包/messenger.yaml

framework:
  messenger:
    transports:
      async: "%env(MESSENGER_TRANSPORT_DSN)%"
Run Code Online (Sandbox Code Playgroud)

请给我最好的例子。谢谢!

Ark*_* G. 9

我的开发:Docker + Centos 7 + PHP73、NGINX。

此配置的解决方案:

1.安装php-rdkafka(重要:版本3.1.x!,修改路径为php;))

yum -y install make librdkafka-devel && git clone --branch 3.1.x https://github.com/arnaud-lb/php-rdkafka.git && cd php-rdkafka && /path/to/php73/root/bin/phpize && ./configure --with-php-config=/path/to/php73/root/bin/php-config && make all -j 5 && make install
Run Code Online (Sandbox Code Playgroud)

2.在php.ini中添加php扩展名

[rdkafka]
extension=rdkafka.so
Run Code Online (Sandbox Code Playgroud)

3. 为 symfony 安装包:

composer req symfony/messenger enqueue/rdkafka enqueue/enqueue-bundle sroze/messenger-enqueue-transport enqueue/async-event-dispatcher
Run Code Online (Sandbox Code Playgroud)

4. 注册 bundles - 添加到 config/bundles.php

Enqueue\Bundle\EnqueueBundle::class => ['all' => true],
Enqueue\MessengerAdapter\Bundle\EnqueueAdapterBundle::class => ['all' => true],
Run Code Online (Sandbox Code Playgroud)

5. 添加文件 config/packages/enqueue.yaml:

enqueue:
  default:
    transport:
      dsn: "rdkafka://"
      global:
        group.id: 'myapp'
        metadata.broker.list: "%env(KAFKA_BROKER_LIST)%"
      topic:
        auto.offset.reset: beginning
      commit_async: true
    client: ~
Run Code Online (Sandbox Code Playgroud)

6. 添加文件 config/packages/messenger.yaml:

framework:
  messenger:
    failure_transport: failed

    transports:
      async:
        dsn:  "%env(MESSENGER_TRANSPORT_DSN)%"
      failed:
        dsn: "doctrine://default?queue_name=failed"

    routing:
      'App\Message\EmailNotification': asyn
Run Code Online (Sandbox Code Playgroud)

7. 添加到 .env:

###> messenger ###
MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&queue[name]=messages
KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092
###< messenger ###
Run Code Online (Sandbox Code Playgroud)

8. 来自文档的 Messege 和 MessengeHandle: https : //symfony.com/doc/current/messenger.html

9.运行消费者:

php bin/console messenger:consume async
Run Code Online (Sandbox Code Playgroud)

祝你好运!

  • `MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&amp;queue[name]=messages` 并确保创建主题消息 - 之后一切都应该正常工作,我花了很多时间调试才发现:) (2认同)