Stream.resource只是来自,而不是?

sto*_*oft 5 elixir

阅读文档,Stream.resource似乎只是为了创建一个可以读取/获取值的资源,而不是写入/放入.我理解正确还是我读错了?如果我理解正确,我必须创建什么类型的资源才能从流中写入/放入Collectable

Pat*_*ity 7

您已正确阅读文档,Stream.resource这只是一种发布值的便捷方法.如果要使用值,也需要实现Collectable协议,这也是正确的.你可以看看到的源代码File.Stream,这同时实现了EnumerableCollectable.

出于演示目的,这里是一个ChunkedWriter模块,它存储值直到缓冲区已满,然后在达到限制时刷新它:

defmodule ChunkedWriter do
  def open(chunk_size) do
    Agent.start_link fn -> {[], chunk_size} end
  end

  def write(agent, value) do
    Agent.update agent, fn {old_buffer, chunk_size} ->
      buffer = [value | old_buffer]
      new_buffer = cond do
        length(buffer) < chunk_size -> buffer
        true -> do_flush(buffer)
      end
      {new_buffer, chunk_size}
    end
  end

  def flush(agent) do
    Agent.update agent, fn {buffer, chunk_size} ->
      {do_flush(buffer), chunk_size}
    end
  end

  defp do_flush(buffer) do
    buffer |> Enum.reverse |> Enum.each(&IO.puts/1)
    IO.puts "---"
    []
  end

  def close(agent) do
    flush(agent)
    Agent.stop(agent)
  end

  def stream(chunk_size) do
    %ChunkedWriter.Stream{chunk_size: chunk_size}
  end
end
Run Code Online (Sandbox Code Playgroud)

这个模块将使用如下:

writer = ChunkedWriter.open(3)

ChunkedWriter.write(writer, 1)
ChunkedWriter.write(writer, 2)
ChunkedWriter.write(writer, 3)
ChunkedWriter.write(writer, 4)
ChunkedWriter.write(writer, 5)

ChunkedWriter.close(writer)
Run Code Online (Sandbox Code Playgroud)

这输出

1
2
3
---
4
5
---
Run Code Online (Sandbox Code Playgroud)

现在该ChunkedWriter.stream/1方法只是设置一个结构,然后将其分派到ChunkedWriter.Stream.这是ChunkedWriter.Stream具有其Collectable实现的模块,因此我们可以管理Enumerable它.

defmodule ChunkedWriter.Stream do
  defstruct chunk_size: 1

  defimpl Collectable do
    def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do
      {:ok, writer} = ChunkedWriter.open(chunk_size)
      {stream, fn
        _acc, {:cont, value} ->
          ChunkedWriter.write(writer, value)
        _acc, :done ->
          :ok = ChunkedWriter.close(writer)
          stream
        _, :halt ->
          :ok = ChunkedWriter.close(writer)
      end}
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

在行动:

Stream.cycle([1,2,3])
|> Stream.take(10)
|> Stream.into(ChunkedWriter.stream(4))
|> Stream.run
Run Code Online (Sandbox Code Playgroud)

这打印:

1
2
3
1
---
2
3
1
2
---
3
1
---
Run Code Online (Sandbox Code Playgroud)