我正在Elixir建立一个工作队列作为学术练习.目前,我的工作人员在创建队列时必须手动注册队列(请参阅参考资料MyQuestion.Worker.start_link).
我希望我的主管在创建/重新启动时向队列中注册可用的工作人员,因为这似乎有助于测试工作人员并最大限度地减少耦合.
有没有办法做我在下面的代码中描述的内容MyQuestion.Supervisor?
defmodule MyQuestion.Supervisor do
use Supervisor
def start_link do
supervisor = Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [
worker(MyQuestion.JobQueue, []),
worker(MyQuestion.Worker, [], id: :worker_0),
worker(MyQuestion.Worker, [], id: :worker_1)]
supervise(children, strategy: :rest_for_one)
end
# LOOKING FOR SOMETHING LIKE THIS
# on worker spawn, I want to add the worker to the queue
def child_spawned(pid, {MyQuestion.Worker, _, _}) do
# add worker to queue
MyQuestion.JobQueue.add_new_worker(pid)
end
# LOOKING FOR SOMETHING LIKE THIS
# I want some way to do the following (imagine the callback existed)
def child_terminated(pid, reason, state)
# with this information I could tell the job queue to mark
# the job associated with the pid as failed and to retry
# or maybe extract the job id from the worker state, etc.
MyQuestion.JobQueue.remove_worker(pid)
MyQuestion.JobQueue.restart_job_for_failed_worker(pid)
end
end
defmodule MyQuestion.JobQueue do
def start_link do
Agent.start_link(fn -> [] end, name: __MODULE__)
end
def new_worker(pid) do
# register pid with agent state in available worker list, etc.
end
def add_job(job_description) do
# find idle worker and run job
<... snip ...>
end
<... snip ...>
end
defmodule MyQuestion.Worker do
use GenServer
def start_link do
# start worker
{:ok, worker} = GenServer.start_link(__MODULE__, [])
# Now we have a worker pid, so we can register that pid with the queue
# I wish this could be in the supervisor or else where.
MyQuestion.JobQueue.add_new_worker(worker)
# must return gen server's start link
{:ok, worker}
end
<... snip ...>
end
Run Code Online (Sandbox Code Playgroud)
他们的关键是调用 \xe2\x80\x93 的组合Process.monitor(pid),然后你将收到对handle_info\xe2\x80\x93 的调用并手动调用Supervisor.start_child它给你 pid。
我以前曾尝试使用handle_info但永远无法调用它。Process.monitor(pid)必须从您想要接收通知的同一进程调用,因此您必须从函数内部调用它,handle_call以将监视器与服务器进程关联起来。可能有一个函数可以将代码作为另一个进程运行(即run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)),但我找不到任何东西。
附件是一个非常简单的作业队列实现。我刚接触 Elixir 一天,所以代码既混乱又不惯用,但我附加它是因为似乎缺乏围绕该主题的示例代码。
\n\n看着HeavyIndustry.JobQueue,handle_info,create_new_worker。此代码有一个明显的问题:它能够在工作程序崩溃时重新启动工作程序,但无法从该代码启动下一个作业的队列(由于需要 inside GenServer.call,handle_info这使我们陷入僵局)。我认为您可以通过将启动作业的进程与跟踪作业的进程分开来解决此问题。如果您运行示例代码,您会注意到最终它会停止运行作业,即使队列中仍有一个作业(:crash)。
defmodule HeavyIndustry.Supervisor do\n use Supervisor\n\n def start_link do\n Supervisor.start_link(__MODULE__, :ok)\n end\n\n def init(:ok) do\n # default to supervising nothing, we will add\n supervise([], strategy: :one_for_one)\n end\n\n def create_children(supervisor, worker_count) do\n # create the job queue. defaults to no workers\n Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))\n end\nend\n\ndefmodule HeavyIndustry.JobQueue do\n use GenServer\n\n @job_queue_name __MODULE__\n\n def start_link(args, _) do\n GenServer.start_link(__MODULE__, args, name: @job_queue_name)\n end\n\n def init([supervisor, n]) do\n # set some default state\n state = %{\n supervisor: supervisor,\n max_workers: n,\n jobs: [],\n workers: %{\n idle: [],\n busy: []\n }\n }\n {:ok, state}\n end\n\n def setup() do\n # we want to be aware of worker failures. we hook into this by calling\n # Process.monitor(pid), but this links the calling process with the monitored\n # process. To make sure the calls come to US and not the process that called\n # setup, we create the workers by passing a message to our server process\n state = GenServer.call(@job_queue_name, :setup)\n\n # gross passing the whole state back here to monitor but the monitoring must\n # be started from the server process and we can\'t call GenServer.call from\n # inside the :setup call else we deadlock.\n workers = state.workers.idle\n GenServer.call(@job_queue_name, {:monitor_pids, workers})\n end\n\n def add_job(from, job) do\n # add job to queue\n {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})\n\n # try to run the next job\n case GenServer.call(@job_queue_name, :start_next_job) do\n # started our job\n {:ok, started_job_id = ^our_job_id} -> {:ok, :started}\n # started *a* job\n {:ok, _} -> {:ok, :pending}\n # couldnt start any job but its ok...\n {:error, :no_idle_workers} -> {:ok, :pending}\n # something fell over...\n {:error, e} -> {:error, e}\n # yeah I know this is bad.\n _ -> {:ok}\n end\n end\n\n def start_next_job do\n GenServer.call(@job_queue_name, :start_next_job)\n end\n\n ##\n # Internal API\n ##\n\n def handle_call(:setup, _, state) do\n workers = Enum.map(0..(state.max_workers-1), fn (n) ->\n {:ok, pid} = start_new_worker(state.supervisor)\n pid\n end)\n state = %{state | workers: %{state.workers | idle: workers}}\n {:reply, state, state}\n end\n\n defp start_new_worker(supervisor) do\n spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)\n # start worker\n Supervisor.start_child(supervisor, spec)\n end\n\n def handle_call({:monitor_pids, list}, _, state) do\n Enum.each(list, &Process.monitor(&1))\n {:reply, :ok, state}\n end\n\n def handle_call({:create_job, job}, from, state) do\n job = %{\n job: job.job,\n reply_to: job.reply_to,\n id: :os.system_time, # id for task\n status: :pending, # start pending, go active, then remove\n pid: nil\n }\n # add new job to jobs list\n state = %{state | jobs: state.jobs ++ [job]}\n {:reply, {:ok, job.id}, state}\n end\n\n def handle_call(:start_next_job, _, state) do\n IO.puts "==> Start Next Job"\n IO.inspect state\n IO.puts "=================="\n\n reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do\n {{:error, :no_idle_workers}, _} ->\n # no workers for job, doesnt matter if we have a job\n {:error, :no_idle_workers}\n\n {_, nil} ->\n # no job, doesnt matter if we have a worker\n {:error, :no_more_jobs}\n\n {{:ok, worker}, job} ->\n # have worker, have job, do work\n\n # update state to set job active and worker busy\n jobs = state.jobs -- [job]\n job = %{job | status: :active, pid: worker}\n jobs = jobs ++ [job]\n\n idle = state.workers.idle -- [worker]\n busy = state.workers.busy ++ [worker]\n\n state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}\n\n {:ok, task_id} = Task.start(fn ->\n result = GenServer.call(worker, job.job)\n\n remove_job(job)\n free_worker(worker)\n\n send job.reply_to, %{answer: result, job: job.job}\n\n start_next_job\n end)\n {:ok, job.id}\n end\n\n {:reply, reply, state}\n end\n\n defp find_idle_worker(workers) do\n case workers do\n %{idle: [], busy: _} -> {:error, :no_idle_workers}\n %{idle: [worker | idle], busy: busy} -> {:ok, worker}\n end\n end\n\n defp find_next_job(jobs) do\n jobs |> Enum.find(&(&1.status == :pending))\n end\n\n defp free_worker(worker) do\n GenServer.call(@job_queue_name, {:free_worker, worker})\n end\n defp remove_job(job) do\n GenServer.call(@job_queue_name, {:remove_job, job})\n end\n\n def handle_call({:free_worker, worker}, from, state) do\n idle = state.workers.idle ++ [worker]\n busy = state.workers.busy -- [worker]\n {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}\n end\n\n def handle_call({:remove_job, job}, from, state) do\n jobs = state.jobs -- [job]\n {:reply, :ok, %{state | jobs: jobs}}\n end\n\n def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do\n IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"\n\n # find job for collapsed worker\n # set job to pending again\n job = Enum.find(state.jobs, &(&1.pid == pid))\n fixed_job = %{job | status: :pending, pid: nil}\n jobs = (state.jobs -- [job]) ++ [fixed_job]\n\n # remote worker from lists\n idle = state.workers.idle -- [pid]\n busy = state.workers.busy -- [pid]\n\n # start new worker\n {:ok, pid} = start_new_worker(state.supervisor)\n\n # add worker from lists\n idle = state.workers.idle ++ [pid]\n\n # cant call GenServer.call from here to monitor pid,\n # so duplicate the code a bit...\n Process.monitor(pid)\n\n # update state\n state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}\n\n {:noreply, state}\n end\nend\n\ndefmodule HeavyIndustry.Worker do\n use GenServer\n\n def start_link do\n GenServer.start_link(__MODULE__, :ok)\n end\n\n def init(:ok) do\n # workers have no persistent state\n IO.puts "==> Worker up! #{inspect self}"\n {:ok, nil}\n end\n\n def handle_call({:sum, list}, from, _) do\n sum = Enum.reduce(list, fn (n, acc) -> acc + n end)\n {:reply, sum, nil}\n end\n\n def handle_call({:fib, n}, from, _) do\n sum = fib_calc(n)\n {:reply, sum, nil}\n end\n\n def handle_call({:stop}, from, state) do\n {:stop, "my-stop-reason", "my-stop-reply", state}\n end\n\n def handle_call({:crash}, from, _) do\n {:reply, "this will crash" ++ 1234, nil}\n end\n\n def handle_call({:timeout}, from, _) do\n :timer.sleep 10000\n {:reply, "this will timeout", nil}\n end\n\n # Slow fib\n defp fib_calc(0), do: 0\n defp fib_calc(1), do: 1\n defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)\n\nend\n\ndefmodule Looper do\n def start do\n {:ok, pid} = HeavyIndustry.Supervisor.start_link\n {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)\n HeavyIndustry.JobQueue.setup()\n add_jobs\n loop\n end\n\n def add_jobs do\n jobs = [\n {:sum, [100, 200, 300]},\n {:crash},\n {:fib, 35},\n {:fib, 35},\n {:sum, [88, 88, 99]},\n {:fib, 35},\n {:fib, 35},\n {:fib, 35},\n {:sum, 0..100},\n # {:stop}, # stop not really a failure\n\n {:sum, [88, 88, 99]},\n # {:timeout},\n {:sum, [-1]}\n ]\n Enum.each(jobs, fn (job) ->\n IO.puts "~~~~> Add job: #{inspect job}"\n case HeavyIndustry.JobQueue.add_job(self, job) do\n {:ok, :started} -> IO.puts "~~~~> Started job immediately"\n {:ok, :pending} -> IO.puts "~~~~> Job in queue"\n val -> IO.puts "~~~~> ... val: #{inspect val}"\n end\n end)\n end\n\n def loop do\n receive do\n value ->\n IO.puts "~~~~> Received: #{inspect value}"\n loop\n end\n end\nend\n\nLooper.start\nRun Code Online (Sandbox Code Playgroud)\n