需要有关Async和fsi的帮助

Str*_*ger 4 f# asynchronous sequence f#-interactive

我想编写一些运行一系列F#脚本(.fsx)的代码.问题是我可以拥有数百个脚本,如果我这样做:

let shellExecute program args =
    let startInfo = new ProcessStartInfo()
    do startInfo.FileName        <- program
    do startInfo.Arguments       <- args
    do startInfo.UseShellExecute <- true
    do startInfo.WindowStyle     <- ProcessWindowStyle.Hidden

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo)
    ()

scripts
|> Seq.iter (shellExecute "fsi")
Run Code Online (Sandbox Code Playgroud)

可能会对我的2GB系统造成太大压力.无论如何,我想通过n批运行脚本,这似乎也是一个很好的学习练习Async(我想这是要走的路).

我已经开始为此编写一些代码但不幸的是它不起作用:

open System.Diagnostics

let p = shellExecute "fsi" @"C:\Users\Stringer\foo.fsx"

async {
    let! exit = Async.AwaitEvent p.Exited
    do printfn "process has exited"
}
|> Async.StartImmediate
Run Code Online (Sandbox Code Playgroud)

foo.fsx只是一个hello world脚本.解决这个问题最常用的方法是什么?

我还想弄清楚是否可以为每个执行脚本检索返回代码,如果没有,找到另一种方法.谢谢!

编辑:

非常感谢您的见解和链接!我学到了很多东西.我只想添加一些代码来并行使用Async.ParallelTomas建议的并行运行批处理.如果我的cut功能有更好的实现,请评论.

module Seq =
  /// Returns a sequence of sequences of N elements from the source sequence.
  /// If the length of the source sequence is not a multiple
  /// of N, last element of the returned sequence will have a length
  /// included between 1 and N-1.
  let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq {
      if (length < count) then yield s
      else
        yield Seq.take count s
        if (length <> count) then
          yield! aux (Seq.skip count s) (length - count)
      }
    aux source (Seq.length source)

let batchCount = 2
let filesPerBatch =
  let q = (scripts.Length / batchCount)
  q + if scripts.Length % batchCount = 0 then 0 else 1

let batchs =
  scripts
  |> Seq.cut filesPerBatch
  |> Seq.map Seq.toList
  |> Seq.map loop

Async.RunSynchronously (Async.Parallel batchs) |> ignore
Run Code Online (Sandbox Code Playgroud)

EDIT2:

所以我有一些麻烦让Tomas的守卫代码工作.我想这个f函数必须在AddHandler方法中调用,否则我们永远会松开事件......这是代码:

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
        member this.RemoveHandler(d) = e.RemoveHandler(d); f()
        member this.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }
Run Code Online (Sandbox Code Playgroud)

有趣的事情(正如Tomas所提到的)看起来Exited事件在进程终止时存储在某个地方,即使进程没有以EnableRaisingEventsset为true 开始.当此属性最终设置为true时,将触发该事件.

由于我不确定这是官方规范(也有点偏执),我找到了另一个解决方案,它包括在guard函数中启动进程,因此我们确保代码可以在任何情况下工作:

let createStartInfo program args =
  new ProcessStartInfo
    (FileName = program, Arguments = args, UseShellExecute = false,
     WindowStyle = ProcessWindowStyle.Normal, 
     RedirectStandardOutput = true)

let createProcess info =
  let p = new Process()
  do p.StartInfo           <- info
  do p.EnableRaisingEvents <- true
  p

let rec loop scripts = async { 
  match scripts with 
  | [] -> printfn "FINISHED"
  | script::scripts ->
    let args = sprintf "\"%s\"" script
    let p = createStartInfo "notepad" args |> createProcess
    let! exit =
      p.Exited 
      |> Event.guard (fun () -> p.Start() |> ignore)
      |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    do printfn "\nPROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
    return! loop scripts 
  }
Run Code Online (Sandbox Code Playgroud)

注意我已经用notepad.exe替换了fsi.exe,所以我可以在调试器中逐步重放不同的场景,并自己控制进程的退出.

Tom*_*cek 6

我做了一些实验,这里有一种方法来处理我的帖子和Joel的答案中讨论的问题(我认为目前不起作用,但可以修复).

认为规范Process是它可以Exited在我们设置EnableRaisingEvents属性后触发事件true(并且即使在我们设置属性之前已经完成了进程也会触发事件).为了正确处理这种情况,我们需要将处理程序附加到事件启用事件的引发Exited.

这是一个问题,因为如果我们使用AwaitEvent它将阻止工作流,直到事件触发.AwaitEvent从工作流程调用后我们无法做任何事情(如果我们在调用之前设置属性AwaitEvent,那么我们就会参加比赛......).弗拉基米尔的方法是正确的,但我认为有一种更简单的方法可以解决这个问题.

我将创建一个Event.guard接受事件并返回事件的函数,这允许我们指定处理程序附加到事件之后将执行的一些函数.这意味着如果我们在此函数中执行某些操作(进而触发事件),则将处理该事件.

要将其用于此处讨论的问题,我们需要更改原始解决方案,如下所示.首先,shellExecute函数不能设置EnableRaisingEvents属性(否则,我们可能会丢失事件!).其次,等待代码应如下所示:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    let p = shellExecute fsi script 
    let! exit = 
      p.Exited 
        |> Event.guard (fun () -> p.EnableRaisingEvents <- true)
        |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    return! loop scripts  } 
Run Code Online (Sandbox Code Playgroud)

注意使用该Event.guard功能.粗略地说,在工作流将处理程序附加到p.Exited事件之后,提供的lambda函数将运行(并将启用事件的引发).但是,我们已经将处理程序附加到事件中,因此如果这会立即导致事件,我们就没事了!

实现(对于两者EventObservable)看起来像这样:

module Event =
  let guard f (e:IEvent<'Del, 'Args>) = 
    let e = Event.map id e
    { new IEvent<'Args> with 
        member x.AddHandler(d) = e.AddHandler(d)
        member x.RemoveHandler(d) = e.RemoveHandler(d); f()
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

module Observable =
  let guard f (e:IObservable<'Args>) = 
    { new IObservable<'Args> with 
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }
Run Code Online (Sandbox Code Playgroud)

好的是这段代码非常简单.


Tom*_*cek 5

你的方法看起来很棒,我真的很喜欢使用AwaitEvent!将流程执行嵌入到异步工作流中的想法!

可能的原因,为什么它不工作的是,你需要设置EnableRisingEvents的属性Processtrue,如果你希望它永远触发Exited事件(不要问我,为什么你要做到这一点,这听起来很愚蠢的我!)不管怎么说,我在测试时对代码做了一些其他更改,所以这里有一个适合我的版本:

open System
open System.Diagnostics

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo
      (FileName = program, Arguments = args, UseShellExecute = false,
       WindowStyle = ProcessWindowStyle.Hidden, 
       RedirectStandardOutput = true)

  // Start the process
  // Note: We must enable rising events explicitly here!
  Process.Start(startInfo, EnableRaisingEvents = true)
Run Code Online (Sandbox Code Playgroud)

最重要的是,代码现在设置EnableRaisingEventstrue.我还更改了代码以使用一种语法,在构造它时指定对象的属性(使代码更简洁)并且我更改了一些属性,以便我可以读取输出(RedirectStandardOutput).

现在,我们可以使用该AwaitEvent方法等待进程完成.我假设它fsi包含fsi.exe的路径,这scripts是一个FSX脚本列表.如果要按顺序运行它们,可以使用使用递归实现的循环:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    // Start the proces in background
    let p = shellExecute fsi script 
    // Wait until the process completes
    let! exit = Async.AwaitEvent p.Exited 
    // Read the output produced by the process, the exit code
    // is available in the `ExitCode` property of `Process`
    let output = p.StandardOutput.ReadToEnd()
    printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
    // Process the rest of the scripts
    return! loop scripts  } 

// This starts the workflow on background thread, so that we can
// do other things in the meantime. You need to add `ReadLine`, so that
// the console application doesn't quit immedeiately
loop scripts |> Async.Start
Console.ReadLine() |> ignore    
Run Code Online (Sandbox Code Playgroud)

当然,您也可以并行运行流程(或者例如并行运行2组).为此,您将使用Async.Parallel(以通常的方式).

无论如何,这是一个非常好的例子,在我到目前为止还没有看到它们的地方使用异步工作流.很有意思 :-)

  • 是的,这是一场比赛,参见例如http://v2matveev.blogspot.com/2010/02/event-based-async-pattern-in-f.html (3认同)
  • 很好的答案.进程在调用`Async.AwaitEvent`之前是否存在任何危险(意味着在添加侦听器后不会引发事件)? (2认同)