我希望能够在运行时动态更改计算管道,但似乎GenStage要求在编译时通过subscribe_to: [...]机制定义计算图.有没有办法创建动态计算图?例如,在下面,我想在运行时切换管道图中的"减7"和"减4"顶点.
这可能使用GenStage吗?我可能会有非常复杂的流水线,因此我需要一种能够以复杂方式扩展到更改图形的解决方案,而不是像特殊解决方案那样,例如,将整数参数化为减法.我希望能够添加或删除整个子树,在子树之间切换,并将节点添加到图形中,包括将它们拼接到任何子树的中间,包括主树.
这是最初的制作人:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Run Code Online (Sandbox Code Playgroud)
这是producer_consumers之一:
defmodule GenstageTest.PcTimesFive do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 * 5))
{:noreply, …Run Code Online (Sandbox Code Playgroud) 以下场景:GenStage生产者处理Twitter Stream(使用Stream API和ExTwitter)并向GenStage消费者提供一组推文(最大消费者要求的需求).然后消费者只是打印它们.
以下问题:我正在寻找具体的推文,因此并不总是有新推文.如果GenStage生产者返回一个空的事件列表,消费者将停止询问.看到这个问题,JoséValims回复了更多.
我不知道如何解决这个问题.任何帮助是极大的赞赏.这是我到目前为止:
defmodule MyApp.TwitterProducer do
use GenStage
alias MyApp.TwitterStream
def start_link(:ok) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# This creates a regular Elixir Stream
# I use this as the state so that not every
# time the consumer asks for new data
# a new stream is initiated
stream = TwitterStream.get_stream
{:producer, stream}
end
def handle_demand(demand, stream) do
# Take tweets from the stream and
# turn them into …Run Code Online (Sandbox Code Playgroud)