阅读文档,Stream.resource
似乎只是为了创建一个可以读取/获取值的资源,而不是写入/放入.我理解正确还是我读错了?如果我理解正确,我必须创建什么类型的资源才能从流中写入/放入Collectable
?
您已正确阅读文档,Stream.resource
这只是一种发布值的便捷方法.如果要使用值,也需要实现Collectable
协议,这也是正确的.你可以看看到的源代码File.Stream
,这同时实现了Enumerable
和Collectable
.
出于演示目的,这里是一个ChunkedWriter
模块,它存储值直到缓冲区已满,然后在达到限制时刷新它:
defmodule ChunkedWriter do
def open(chunk_size) do
Agent.start_link fn -> {[], chunk_size} end
end
def write(agent, value) do
Agent.update agent, fn {old_buffer, chunk_size} ->
buffer = [value | old_buffer]
new_buffer = cond do
length(buffer) < chunk_size -> buffer
true -> do_flush(buffer)
end
{new_buffer, chunk_size}
end
end
def flush(agent) do
Agent.update agent, fn {buffer, chunk_size} ->
{do_flush(buffer), chunk_size}
end
end
defp do_flush(buffer) do
buffer |> Enum.reverse |> Enum.each(&IO.puts/1)
IO.puts "---"
[]
end
def close(agent) do
flush(agent)
Agent.stop(agent)
end
def stream(chunk_size) do
%ChunkedWriter.Stream{chunk_size: chunk_size}
end
end
Run Code Online (Sandbox Code Playgroud)
这个模块将使用如下:
writer = ChunkedWriter.open(3)
ChunkedWriter.write(writer, 1)
ChunkedWriter.write(writer, 2)
ChunkedWriter.write(writer, 3)
ChunkedWriter.write(writer, 4)
ChunkedWriter.write(writer, 5)
ChunkedWriter.close(writer)
Run Code Online (Sandbox Code Playgroud)
这输出
1
2
3
---
4
5
---
Run Code Online (Sandbox Code Playgroud)
现在该ChunkedWriter.stream/1
方法只是设置一个结构,然后将其分派到ChunkedWriter.Stream
.这是ChunkedWriter.Stream
具有其Collectable
实现的模块,因此我们可以管理Enumerable
它.
defmodule ChunkedWriter.Stream do
defstruct chunk_size: 1
defimpl Collectable do
def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do
{:ok, writer} = ChunkedWriter.open(chunk_size)
{stream, fn
_acc, {:cont, value} ->
ChunkedWriter.write(writer, value)
_acc, :done ->
:ok = ChunkedWriter.close(writer)
stream
_, :halt ->
:ok = ChunkedWriter.close(writer)
end}
end
end
end
Run Code Online (Sandbox Code Playgroud)
在行动:
Stream.cycle([1,2,3])
|> Stream.take(10)
|> Stream.into(ChunkedWriter.stream(4))
|> Stream.run
Run Code Online (Sandbox Code Playgroud)
这打印:
1
2
3
1
---
2
3
1
2
---
3
1
---
Run Code Online (Sandbox Code Playgroud)