Elixir:Observables

tld*_*ldr 2 stream observablecollection elixir

Elixir流提供迭代,但我找不到任何关于observable的信息(谷歌在这里没有帮助).如果有人能指出我同样的资源,我将不胜感激.

bit*_*ker 5

您可以将Stream和Enum结合起来编写可观察样式的代码.以下是以可观察方式编写的echo服务器的示例:

IO.stream(:stdio, :line) 
|> Stream.map(&String.upcase/1)
|> Enum.each(&IO.write(&1))
Run Code Online (Sandbox Code Playgroud)

基本上,对于发送到标准输入的每一行,它将转换为大写,然后打印回标准输出.这是一个简单的例子,但关键是你需要编写一个observable才能通过Stream和Enum获得.

  • 我还应该澄清一下,由于Observable的推送特性,有一些Observable动作,如merge和take_until,不受流支持.但是,Elixir可枚举是可折叠的,并且可以很好地处理资源,所以大多数可观察的功能*都在那里. (3认同)

Ale*_*lik 5

Elixir中的流是功能组合的抽象.最后,你得到的只是一个函数,调用它将循环输入流并转换它.

为了构建有状态的流,如Twitter4j中的示例(在一秒钟内缓冲新的twitter规则并将它们全部分发到一个列表中),您将需要使用可以具有状态的构建块.在Elixir中,通常将状态封装在进程中.

示例可能如下所示

tweetsPerSecond =
  twitterStream 
  |> SS.buffer({1, :second}) 
  |> SS.map(&length(&1))

SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Got #{n} tweets in the last second" end)
SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Second subscriber" end)
Run Code Online (Sandbox Code Playgroud)

SS是一个我们需要编写以实现可观察功能的新模块.核心思想(据我所知)能够订阅流而无需修改它.

为了使其发挥作用,twitterStream本身应该是一个为其他人消费的事件.Stream在这种情况下你不能使用它,因为它具有"阻塞拉"语义,即在经过一段固定的时间后你将无法中断等待流中的下一个元素.

要在Elixir中实现等效功能,请查看该GenEvent模块.它提供发出和订阅事件的能力.虽然它没有类似流的界面,但并不是我所知道的.