在F#中使用FileHelperAsyncEngine

pra*_*rCS 2 csv filehelpers f# asynchronous

我试图使用FileHelpers将csv文件中的行加载到f#中的Elasticsearch数据库以读取csv.一切都适用于小型测试文件,下面的代码片段一次读取所有记录

let readRows<'T>(filePath:string) =
    let engine = FileHelperEngine(typeof<'T>)

    engine.ReadFile(filePath)
    |> Array.map (fun row -> row :?> 'T)
Run Code Online (Sandbox Code Playgroud)

不幸的是,它需要能够读取更大的文件,其中许多列将在以后逐行丢弃.FileHelperAsyncEngine.BeginReadFile函数返回一个IDisposable.

let readRowsAsync<'T>(filePath:string) =
    let engine = new FileHelperAsyncEngine(typeof<'T>)

    engine.BeginReadFile(filePath:string)
    |> ...
Run Code Online (Sandbox Code Playgroud)

如何进一步将此对象处理为<T> s数组?

Fyo*_*kin 5

根据文档,在你调用之后BeginReadFile,它engine本身就变成了一个可以迭代的可枚举序列(这是一个非常奇怪的设计决策).所以你可以在它上面构建自己的序列:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    for r in engine do
      if not (shouldDiscard r) then yield (map r)
  }
Run Code Online (Sandbox Code Playgroud)

请注意,我正在使用use绑定,而不是let.这将确保在序列结束之后丢弃一次性物品,或者消费者停止在它上面迭代.

请注意,以下将无法正常工作,即使它会编译:

let readRowsAsync<'T>(filePath:string) = 
  let engine = new FileHelperAsyncEngine(typeof<'T>)
  use disposable = engine.BeginReadFile(filePath)

  engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
Run Code Online (Sandbox Code Playgroud)

如果你这样做,一次性将在函数返回后被处理,但是在迭代结果枚举之前,因此在它的时间之前关闭文件.为了确保正确处置一次性物品,您必须将整个物品包含在seq表达式中.

如果你真的想使用Seq.filter/ Seq.map而不是for/ yield,你仍然可以这样做,但在seq表达式中,如下所示:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    yield! engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
  }
Run Code Online (Sandbox Code Playgroud)

您还可以将过滤和映射从seq表达式中移除(这将使您的函数更可重用),但seq表达式本身必须保留在原位,因为它控制了处理部分:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    yield! engine
  }

let results = 
  readRowsAsync<SomeType>( "someFile.txt" )
  |> Seq.filter (not << shouldDiscard) 
  |> Seq.map map
Run Code Online (Sandbox Code Playgroud)

最后,必须注意的是,您应该小心处理这个序列,因为它持有非托管资源(即打开文件):不要长时间保持打开状态,在处理时不要使用阻塞操作,等等