可见/流向前看

War*_*ren 7 elixir

我开始学习Elixir并遇到了一个我无法轻易解决的挑战.

我正在尝试创建一个带有Enumerable.t的函数,并返回另一个包含下n个项的Enumerable.t .它与Enum.chunk(e,n,1,[])的行为略有不同,因为数字迭代计数总是等于原始的可枚举计数.我还需要支持Streams

@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
Run Code Online (Sandbox Code Playgroud)

使用doctest语法可以很好地说明这一点:

iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]

iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]

iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]

iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
Run Code Online (Sandbox Code Playgroud)

我已经研究过实现Enumerable.t协议,但还没有完全理解Enumerable.reduce接口.

有没有简洁/优雅的方式这样做?

我的用例是二进制流上的一个小的固定n值(1或2),因此优化版本的额外点.但是,为了学习Elixir,我对多个用例的解决方案很感兴趣.表现很重要.我将针对解决方案和发布的各种n值运行一些基准测试.

基准更新 - 2015年4月8日

已经发布了6个可行的解决方案.有关基准的详细信息,请访问https://gist.github.com/spitsw/fce5304ec6941578e454.基准测试在列表中运行,其中包含500个不同n值的项目.

对于n = 1,得到以下结果:

PatrickSuspend.lookahead    104.90 µs/op
Warren.lookahead            174.00 µs/op
PatrickChunk.lookahead      310.60 µs/op
PatrickTransform.lookahead  357.00 µs/op
Jose.lookahead              647.60 µs/op
PatrickUnfold.lookahead     1484000.00 µs/op
Run Code Online (Sandbox Code Playgroud)

对于n = 50,结果如下:

PatrickSuspend.lookahead    220.80 µs/op
Warren.lookahead            320.60 µs/op
PatrickTransform.lookahead  518.60 µs/op
Jose.lookahead              1390.00 µs/op
PatrickChunk.lookahead      3058.00 µs/op
PatrickUnfold.lookahead     1345000.00 µs/op (faster than n=1)
Run Code Online (Sandbox Code Playgroud)

Pat*_*ity 5

正如评论中所讨论的,我的第一次尝试遇到了一些性能问题,并且不适用于具有副作用的流,例如IO流.我花时间深入挖掘了流库,最后提出了这个解决方案:

defmodule MyStream
  def lookahead(enum, n) do
    step = fn val, _acc -> {:suspend, val} end
    next = &Enumerable.reduce(enum, &1, step)
    &do_lookahead(n, :buffer, [], next, &1, &2)
  end

  # stream suspended
  defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
    {:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
  end

  # stream halted
  defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
    {:halted, acc}
  end

  # initial buffering
  defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
    case next.({:cont, []}) do
      {:suspended, val, next} ->
        new_state = if length(buf) < n, do: :buffer, else: :emit
        do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
      {_, _} ->
        do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
    end
  end

  # emitting
  defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
    case next.({:cont, []}) do
      {:suspended, val, next} ->
        do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
      {_, _} ->
        do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
    end
  end

  # buffer empty, halting
  defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
    {:halted, acc}
  end
end
Run Code Online (Sandbox Code Playgroud)

这一开始可能看起来令人生畏,但实际上并不是那么难.我会尝试为你分解它,但对于像这样的完整例子来说这很难.

让我们从一个更简单的例子开始:一个无休止地重复给定值的流.为了发出流,我们可以返回一个函数,它接受一个累加器和一个函数作为参数.要发出一个值,我们用两个参数调用该函数:要发出的值和累加器.acc累加器是一个由命令(:cont,:suspend:halt)组成的元组,告诉我们消费者希望我们做什么; 我们需要返回的结果取决于操作.如果应该暂停流,我们返回原子的三元素元组:suspended,累加器和枚举继续时将被调用的函数(有时称为"延续").对于:halt命令,我们简单地返回{:halted, acc}并且:cont通过执行如上所述的递归步骤来发出值.整个事情看起来像这样:

defmodule MyStream do
  def repeat(val) do
    &do_repeat(val, &1, &2)
  end

  defp do_repeat(val, {:suspend, acc}, fun) do
    {:suspended, acc, &do_repeat(val, &1, fun)}
  end

  defp do_repeat(_val, {:halt, acc}, _fun) do
    {:halted, acc}
  end

  defp do_repeat(val, {:cont, acc}, fun) do
    do_repeat(val, fun.(val, acc), fun)
  end
end
Run Code Online (Sandbox Code Playgroud)

现在这只是这个难题的一部分.我们可以发出一个流,但是我们还没有处理传入的流.再次,为了解释它是如何工作的,构造一个更简单的例子是有意义的.在这里,我将构建一个可枚举的函数,并为每个值暂停和重新发出.

defmodule MyStream do
  def passthrough(enum) do
    step = fn val, _acc -> {:suspend, val} end
    next = &Enumerable.reduce(enum, &1, step)
    &do_passthrough(next, &1, &2)
  end

  defp do_passthrough(next, {:suspend, acc}, fun) do
    {:suspended, acc, &do_passthrough(next, &1, fun)}
  end

  defp do_passthrough(_next, {:halt, acc}, _fun) do
    {:halted, acc}
  end

  defp do_passthrough(next, {:cont, acc}, fun) do
    case next.({:cont, []}) do
      {:suspended, val, next} ->
        do_passthrough(next, fun.(val, acc), fun)
      {_, _} ->
        {:halted, acc}
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

第一个子句设置next传递给do_passthrough函数的函数.它用于从传入流中获取下一个值.内部使用的步骤函数定义我们为流中的每个项暂停.除了最后一个条款外,其余部分非常相似.在这里,我们调用下一个函数{:cont, []}来获取一个新值并通过case语句处理结果.如果有值,我们会回来{:suspended, val, next},如果没有,则流停止并将其传递给消费者.

我希望能澄清一些关于如何手动在Elixir中构建流的内容.不幸的是,使用流需要大量的样板.如果你lookahead现在回到实现,你会发现只有微小的差异,这是真正有趣的部分.还有两个附加参数:state用于区分步骤:buffer:emit步骤,并在初始缓冲步骤中buffer预先填充n+1项目.在发射阶段,发射当前缓冲区,然后在每次迭代时向左移位.当输入流停止或我们的流直接停止时,我们就完成了.


我将原来的答案留在这里供参考:

这是一个解决方案,用于Stream.unfold/2根据您的规范发出真正的值流.这意味着您需要添加Enum.to_list到前两个示例的末尾以获取实际值.

defmodule MyStream do
  def lookahead(stream, n) do
    Stream.unfold split(stream, n+1), fn
      {[], stream} ->
        nil
      {[_ | buf] = current, stream} ->
        {value, stream} = split(stream, 1)
        {current, {buf ++ value, stream}}
    end
  end

  defp split(stream, n) do
    {Enum.take(stream, n), Stream.drop(stream, n)}
  end
end
Run Code Online (Sandbox Code Playgroud)

一般的想法是我们保持前面的迭代的buf.在每次迭代中,我们发出当前的buf,从流中获取一个值并将其附加到buf的末尾.重复此过程直到buf为空.

例:

iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]

iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]

iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
Run Code Online (Sandbox Code Playgroud)