Elixir:我可以使用Stream.resource逐步读取大数据文件吗?

Gav*_*aff 12 elixir

我知道如何使用Stream.resource()从文件中获取前5行并将它们放在列表中.

str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
                fn file ->
                  case IO.read(file, :line) do
                    data when is_binary(data) -> {[data], file}
                    _ -> {:halt, file}
                  end
                end,
                fn file -> File.close(file) end)
str |>  Enum.take(5)
Run Code Online (Sandbox Code Playgroud)

但是,如何从同一个流中获取接下来的5行呢?如果我再次输入:

str |>  Enum.take(5)
Run Code Online (Sandbox Code Playgroud)

我刚刚得到相同的前5行.

我错过了一些明显的东西吗?

最后,我希望从我的流中读取足够的数据,以产生一些处理该数据的进程.当其中一些进程完成时,我希望从同一个流中读取更多内容,从而处理下一组数据等.应该在这里使用Stream.chunk()吗?但是,如果没有一个例子,我似乎无法直觉.

编辑 - 稍后进行几次设计迭代!

为了我的目的,不使用Stream更容易.相反,我简单地使用创建文件指针/进程

{:ok,fp} = File.open("data/fidap011.mtx")

然后我实际上将那个fp传递给了30000个不同的衍生过程,并且当他们喜欢时,他们可以毫不费力地阅读它.这些进程中的每一个都通过从文件中读取新的状态变量来改变其状态.在下面的模块中oR,vR是两个接收消息的"路由器"进程 - 代码是稀疏矩阵/向量乘法器的一部分.

defmodule M_Cells do
 @moduledoc """
 Provides matrix related code
 Each cell process serves for that row & col
 """

 defp get_next_state( fp ) do
    case IO.read( fp, :line ) do
        data when is_binary(data) ->
            [rs,cs,vs] = String.split( data )
            r = String.to_integer(rs)
            c = String.to_integer(cs)
            v = String.to_float(vs)
            {r,c,v}
        _ -> 
            File.close( fp )
            :fail
    end
 end


 defp loop(fp, r,c,v, oR,vR) do
  # Maintains state of Matrix Cell, row, col, value 
  # receives msgs and responds
   receive do

    :start  ->  
        send vR, { :multiply, c, self() }  # get values for operands via router vR
        loop(fp, r,c,v, oR,vR)

    { :multiply, w } ->  # handle request to multiply by w and relay to router oR
        send oR, { :sum, r, v*w }
        case get_next_state( fp ) do # read line from file and fill in rcv
            {r1,c1,v1} ->
                send vR, { :multiply, c1, self() }
                loop(fp, r1,c1,v1, oR,vR)
            _ -> ## error or end of file etc
              ##IO.puts(":kill rcv: #{r},#{c},#{v}")
              Process.exit( self(), :kill )
        end
   end
 end

 # Launch each matrix cell using iteration by tail recursion
 def launch(_fp, _oR,_vR, result, 0) do
   result |> Enum.reverse # reverse is cosmetic, not substantive
 end

 def launch(fp, oR,vR, result, count) do
    #IO.inspect count
    case get_next_state( fp ) do
        {r,c,v} ->
            pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
            launch( fp, oR,vR, [pid|result], count-1 )

        _ -> ## error or end of file etc, skip to count 0
            launch( fp, oR,vR, result, 0 )
    end
 end

end
Run Code Online (Sandbox Code Playgroud)

请享用!

Pat*_*ity 22

作为旁注,从文件创建流是一项常见任务.这已经得到了解决,因此您可以简单地使用File.stream!/3创建流,而无需Stream.resource/3直接使用.

关于你原来的问题:是的,你是对的,Stream.chunk_every/2是去的地方.它将懒惰地将流分成所提供大小的块:

File.stream!("./data/fidap011.mtx") |> Stream.chunk_every(5) |> Enum.each(fn chunk ->
  # do something with chunk
end)
Run Code Online (Sandbox Code Playgroud)