标签: genstage

使用Elixir Genstage的运行时动态计算图

我希望能够在运行时动态更改计算管道,但似乎GenStage要求在编译时通过subscribe_to: [...]机制定义计算图.有没有办法创建动态计算图?例如,在下面,我想在运行时切换管道图中的"减7"和"减4"顶点.

在此输入图像描述

这可能使用GenStage吗?我可能会有非常复杂的流水线,因此我需要一种能够以复杂方式扩展到更改图形的解决方案,而不是像特殊解决方案那样,例如,将整数参数化为减法.我希望能够添加或删除整个子树,在子树之间切换,并将节点添加到图形中,包括将它们拼接到任何子树的中间,包括主树.

请进一步参考编辑

这是最初的制作人:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end
Run Code Online (Sandbox Code Playgroud)

这是producer_consumers之一:

defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, …
Run Code Online (Sandbox Code Playgroud)

erlang elixir genstage

13
推荐指数
1
解决办法
281
查看次数

GenStage:如何处理生产者无法提供事件的情况?

以下场景:GenStage生产者处理Twitter Stream(使用Stream APIExTwitter)并向GenStage消费者提供一组推文(最大消费者要求的需求).然后消费者只是打印它们.

以下问题:我正在寻找具体的推文,因此并不总是有新推文.如果GenStage生产者返回一个空的事件列表,消费者将停止询问.看到这个问题,JoséValims回复了更多.

我不知道如何解决这个问题.任何帮助是极大的赞赏.这是我到目前为止:

defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream

  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end

  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into …
Run Code Online (Sandbox Code Playgroud)

elixir genstage

4
推荐指数
1
解决办法
855
查看次数

标签 统计

elixir ×2

genstage ×2

erlang ×1