使用Elixir Genstage的运行时动态计算图

Tho*_*wne 13 erlang elixir genstage

我希望能够在运行时动态更改计算管道,但似乎GenStage要求在编译时通过subscribe_to: [...]机制定义计算图.有没有办法创建动态计算图?例如,在下面,我想在运行时切换管道图中的"减7"和"减4"顶点.

在此输入图像描述

这可能使用GenStage吗?我可能会有非常复杂的流水线,因此我需要一种能够以复杂方式扩展到更改图形的解决方案,而不是像特殊解决方案那样,例如,将整数参数化为减法.我希望能够添加或删除整个子树,在子树之间切换,并将节点添加到图形中,包括将它们拼接到任何子树的中间,包括主树.

请进一步参考编辑

这是最初的制作人:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end
Run Code Online (Sandbox Code Playgroud)

这是producer_consumers之一:

defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, numbers, state}
  end
end
Run Code Online (Sandbox Code Playgroud)

这是最终的消费者:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end
Run Code Online (Sandbox Code Playgroud)

我全部模仿Elixir School Genstage教程.

所有模块和mix.ex都可以在github找到.

@AquarHEAD L.部分回答后3天编辑

我设法让运行时订阅工作.以下是一些修改后的生产者,生产者消费者和消费者:

制片人:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end
Run Code Online (Sandbox Code Playgroud)

Producter_consumer:

defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end
Run Code Online (Sandbox Code Playgroud)

消费者:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect event
      #File.write("/home/tbrowne/scratch/output.txt", 
      #  Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end
Run Code Online (Sandbox Code Playgroud)

现在,一旦这些都在lib目录中可用(记得添加{:gen_stage, "~> 0.11"}到你的mix.exs deps中),或者复制并粘贴到IEX中,那么以下内容将完美地运行:

{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)
Run Code Online (Sandbox Code Playgroud)

现在的问题是,我仍然不知道如何取消订阅.有取消功能,还有一个停止功能.GenStage.stop(c)例如似乎什么都不做,而我的各种尝试GenStage.cancel/3只会给出错误.

回顾一下,我现在需要的是能够停止某些阶段并将其替换为其他阶段.取消子语法的语法是什么,从哪里调用?在文档中没有很好地解释,因为没有具体的例子.

小智 5

您可以在运行时绝对更改管道,在GenStage文档中查看第一个示例,您也可以使用该:manual模式来精确控制需求.还有取消订阅API.我认为这些足以动态管理GenStage管道.