F#异步工作流/任务与免费monad相结合

kag*_*oki 3 f# asynchronous computation-expression async-await free-monad

我正在尝试使用免费的monad模式构建用于消息处理的管道,我的代码看起来像这样:

module PipeMonad =
type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async -> 'a)

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>
    | Stop of 'a

let rec bind f = function
    | Act x -> x |> mapInstruction (bind f) |> Act
    | Stop x -> f x

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop ()
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (HandleAsync (msgIn, Stop))
let sendOutAsync msgOut = Act (SendOutAsync (msgOut, Stop))
Run Code Online (Sandbox Code Playgroud)

我根据这篇文章写的

但是它有那些方法异步是对我很重要(Task最好,但是Async是可以接受的),但是当我创造了我的建设者pipeline,我无法弄清楚如何使用它-我怎么能等待一个Task<'msgOut>Async<'msgOut>这样我就可以把它发送出去并等待这个"发送"任务?

现在我有这段代码:

let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        let result = async {
            let! msgOut = msgOut
            log msgOut
            return sendOutAsync msgOut
        }
        return result
    }
Run Code Online (Sandbox Code Playgroud)

返回 PipeProgram<'b, 'a, Async<PipeProgram<'c, 'a, Async>>>

Tom*_*cek 6

首先,我认为在F#中使用免费monad非常接近于反模式.这是一个非常抽象的结构,不适合用惯用的F#风格 - 但这是一个偏好的问题,如果你(和你的团队)发现这种编写代码可读且易于理解的方式,那么你当然可以去在这个方向.

出于好奇,我花了一些时间玩你的例子 - 虽然我还没有弄清楚如何完全修复你的例子,但我希望以下可能有助于引导你朝着正确的方向前进.总结是我认为你需要集成Async到你PipeProgram的管道程序本质上是异步的:

type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async<unit> -> 'a)
    | Continue of 'a 

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of Async<PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>>
    | Stop of Async<'a>
Run Code Online (Sandbox Code Playgroud)

请注意,我必须添加Continue以使我的函数类型检查,但我认为这可能是一个错误的黑客,你可能需要远程.通过这些定义,您可以执行以下操作:

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)
    | Continue v -> Continue v

let rec bind (f:'a -> PipeProgram<_, _, _>) = function
    | Act x -> 
        let w = async { 
          let! x = x 
          return mapInstruction (bind f) x }
        Act w
    | Stop x -> 
        let w = async {
          let! x = x
          let pg = f x
          return Continue pg
        }
        Act w

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop (async.Return())
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (async.Return(HandleAsync (msgIn, Stop)))
let sendOutAsync msgOut = Act (async.Return(SendOutAsync (msgOut, Stop)))

let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        log msgOut
        return! sendOutAsync msgOut
    }

pipeline ignore 0 
Run Code Online (Sandbox Code Playgroud)

现在,PipeProgram<int, unit, unit>通过使用一个作用于命令的递归异步函数,您可以轻松地对此进行评估.


Gru*_*oon 5

以我的理解,免费monad的全部要点是您不公开Async之类的效果,因此我认为不应在PipeInstruction类型中使用它们。解释器是添加效果的地方。

另外,Free Monad仅在Haskell中才有意义,您需要做的就是定义一个仿函数,然后自动获得其余的实现。在F#中,您还必须编写其余代码,因此,使用Free与更传统的解释器模式相比并没有太大好处。您链接到的TurtleProgram代码只是一个实验-我完全不建议将Free用于真实代码。

最后,如果您已经知道将要使用的效果,并且不会有一个以上的解释,那么使用这种方法就没有意义。只有当收益超过复杂性时才有意义。

无论如何,如果您确实想编写一个解释器版本(而不是免费版本),这就是我的方法:

首先,定义没有任何影响的说明。

/// The abstract instruction set
module PipeProgram =

    type PipeInstruction<'msgIn, 'msgOut,'state> =
        | Handle of 'msgIn * ('msgOut -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | SendOut of 'msgOut * (unit -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | Stop of 'state
Run Code Online (Sandbox Code Playgroud)

然后,您可以为其编写一个计算表达式:

/// A computation expression for a PipeProgram
module PipeProgramCE =
    open PipeProgram

    let rec bind f instruction =
        match instruction with
        | Handle (x,next) ->  Handle (x, (next >> bind f))
        | SendOut (x, next) -> SendOut (x, (next >> bind f))
        | Stop x -> f x

    type PipeBuilder() =
        member __.Bind (x, f) = bind f x
        member __.Return x = Stop x
        member __.Zero () = Stop ()
        member __.ReturnFrom x = x

let pipe = PipeProgramCE.PipeBuilder()
Run Code Online (Sandbox Code Playgroud)

然后,您可以开始编写计算表达式。在您开始使用解释器之前,这将有助于刷新设计。

// helper functions for CE
let stop x = PipeProgram.Stop x
let handle x = PipeProgram.Handle (x,stop)
let sendOut x  = PipeProgram.SendOut (x, stop)

let exampleProgram : PipeProgram.PipeInstruction<string,string,string> = pipe {
    let! msgOut1 = handle "In1"
    do! sendOut msgOut1
    let! msgOut2 = handle "In2"
    do! sendOut msgOut2
    return msgOut2
    }
Run Code Online (Sandbox Code Playgroud)

描述了说明之后,就可以编写解释器了。正如我所说,如果您不编写多个解释器,那么也许根本不需要这样做。

这是非异步版本(原为“ Id monad”)的解释器:

module PipeInterpreterSync =
    open PipeProgram

    let handle msgIn =
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()
        msgOut

    let sendOut msgOut =
        printfn "Out: %A"  msgOut
        ()

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) ->
            let result = handle x
            result |> next |> interpret
        | SendOut (x, next) ->
            let result = sendOut x
            result |> next |> interpret
        | Stop x ->
            x
Run Code Online (Sandbox Code Playgroud)

这是异步版本:

module PipeInterpreterAsync =
    open PipeProgram

    /// Implementation of "handle" uses async/IO
    let handleAsync msgIn = async {
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()
        return msgOut
        }

    /// Implementation of "sendOut" uses async/IO
    let sendOutAsync msgOut = async {
        printfn "Out: %A"  msgOut
        return ()
        }

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) -> async {
            let! result = handleAsync x
            return! result |> next |> interpret
            }
        | SendOut (x, next) -> async {
            do! sendOutAsync x
            return! () |> next |> interpret
            }
        | Stop x -> x
Run Code Online (Sandbox Code Playgroud)