我应该使用哪种OTP行为来"无休止"重复任务?

opt*_*fel 20 elixir erlang-otp phoenix-framework

我希望在凤凰应用程序旁边反复运行相同的操作序列(当然,如果某些东西在工作者中刹车,则不会崩溃整个网络应用程序)并且我真的不知道我应该使用GenServer,Elixir的任务,代理人或者我到目前为止没有想过的完全不同的东西.

当我启动我的凤凰应用程序时,工作人员也应该启动,定期拉出一些串行连接值,通过凤凰频道广播它们,收集它们直到@save_interval达到然后计算中位数,通过不同的频道广播中位数和将它写入InfluxDB.现在我有这样的东西(有点工作):

def do_your_thing(serial_pid) do
  Stream.interval(@interval_live)
    |> get_new_values_from_serial(serial_pid)
    |> broadcast!("live-channel:#{@name}")
    |> Enum.take(div(@interval_save, @interval_live))
    |> calculate_medians()
    |> broadcast!("update-channel:#{@name}")
    |> write_to_database()

  do_your_thing(serial_pid) # repeat
end
Run Code Online (Sandbox Code Playgroud)

我只是开始想出所有OTP的东西,并希望你们中的某些人可以帮助我在这里绊倒正确的方向.

Jos*_*lim 32

您应该使用在x秒后发送消息的GenServer(在下面的示例中为60秒):

defmodule MyApp.Worker do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    schedule_work()
    {:ok, []}
  end

  def handle_info(:work, state) do
    state = do_work(state)
    schedule_work()
    {:noreply, state}
  end

  defp do_work(state) do
    # Do your work here and return state
  end

  defp schedule_work do
    Process.send_after(self(), :work, 60_000)
  end
end
Run Code Online (Sandbox Code Playgroud)

  • 原因是Task无法接收系统消息.我们想让Stream和朋友知道这些,但它不在1.0中,可能只在1.3中. (4认同)
  • @JoséValim在Elixir 1.3中有更好的方法吗? (3认同)
  • 为什么不在无限循环中定期执行其任务的任务(可能由Stream.interval或Stream.repeatedly提供)?如果这只是一个周期性的拉动,将拉出的数据进一步转发到系统,它实际上并不需要是GenServer,对吧.任务仍然符合OTP标准,对我而言,这项工作似乎更为直接. (2认同)