Kri*_*ote 5 elixir erlang-otp rabbitmq erlang-supervisor
我一直在关注 Elixir 的 RabbitMQ 工作队列教程(Elixir Work Queues),效果非常好。最重要的是,我现在正在尝试让多个消费者启动并由主管进行监控。
事实证明,最后一部分有点棘手。如果我在 2 个单独的 iex 会话中运行以下代码,两者都会从 RabbitMQ 获取和处理消息。
客户(消费者)
defmodule MT.Client do
require Logger
@host Application.get_env(:mt, :host)
@username Application.get_env(:mt, :username)
@password Application.get_env(:mt, :password)
@channel Application.get_env(:mt, :channel)
def start_link do
MT.Client.connect
end
def connect do
{:ok, connection} = AMQP.Connection.open(host: @host, username: @username, password: @password)
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, @channel, durable: true)
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, @channel)
Logger.info "[*] Waiting for messages"
MT.Client.loop(channel)
end
def loop(channel) do
receive do
{:basic_deliver, payload, meta} ->
Logger.info "[x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
Logger.info "[x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)
MT.Client.loop(channel)
end
end
end
Run Code Online (Sandbox Code Playgroud)
导师
defmodule MT.Client.Supervisor do
use Supervisor
require Logger
@name MTClientSupervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: @name)
end
def init(:ok) do
children = [
worker(MT.Client, [], restart: :transient, id: "MTClient01"),
worker(MT.Client, [], restart: :transient, id: "MTClient02"),
worker(MT.Client, [], restart: :transient, id: "MTClient03")
]
supervise(children, strategy: :one_for_one)
end
end
Run Code Online (Sandbox Code Playgroud)
在 iex 会话中运行该命令时:
iex -S mix
MT.Client.Supervisor.start_link
Run Code Online (Sandbox Code Playgroud)
记录以下内容:
08:46:50.746 [info] [*] Waiting for messages
08:46:50.746 [info] [x] Received {"job":"TestMessage","data":{"message":"message........"}}
08:46:58.747 [info] [x] Done.
08:46:58.748 [info] [x] Received {"job":"TestMessage","data":{"message":"last........"}}
08:47:06.749 [info] [x] Done.
Run Code Online (Sandbox Code Playgroud)
很明显,只有 1 个消费者处于活动状态,正在按顺序消费消息。
在 2 个 iex 会话中运行以下命令:
MT.Client.start_link
Run Code Online (Sandbox Code Playgroud)
我没有在此处添加日志,但在这种情况下,我同时得到 2 个处理消息的消耗
我确信我根本没有掌握 Agent/GenServer/Supervisor 所需的详细信息。任何人都可以指出需要对上面的 MT.Client 和 MT.Client.Supervisor 进行哪些更改才能实现让多个消费者活跃在同一通道上的想法?
还; 我一直在尝试生成消费者代理并使用生成的 pid AMQP.Basic.consume(channel, @channel, pid)- 但这也失败了。
| 归档时间: |
|
| 查看次数: |
1058 次 |
| 最近记录: |