如何使我的队列在 F# 上安全(并发)

Jih*_*eon 2 queue f# thread-safety

我在 F# 中制作了一个看起来像队列的数据结构,我不确定是否可以将其称为“队列”,因为它可以在序列中间随机访问并对其进行编辑。

type item =
    {
        name : string
        mutable point : int
    }

type Queue = class
    val mutable q : item []

    new () = { q = Array.empty; }

    member this.enq i =
        let mutable b = true
        try
            this.q <- Array.append this.q [|i|]
        with
            | _ as e -> (b <- false)
        b

    member this.deq (i : byref<item>) =
        let mutable b = true
        try
            i <- this.q.[0]
            this.q <- Array.sub this.q 1 (this.q.Length - 1)
        with
            | _ as e -> (b <- false)
        b

    member this.edit p idx =
        let mutable b = true
        try
            this.q.[idx].point <- p
        with
            | _ as e -> (b <- false)
        b

    member this.print =
        let mutable b = true
        try
            for j = 0 to (this.q.Length - 1) do
                printfn "%s %i" this.q.[j].name this.q.[j].point
        with
            | _ as e -> (b <- false)
        b
end
Run Code Online (Sandbox Code Playgroud)

那么我怎样才能使我的看起来像队列的线程安全呢?

Aar*_*ach 5

除了Just another metaprogrammer给出的答案之外,我在 F# 中处理“我需要 X 的线程安全版本”的一种方法是将其放在邮箱处理器后面。邮箱处理器可以确保事务按顺序、一次处理一个,因此您不必担心并发性。虽然使用不可变数据类型和随之而来的算法当然是更好的选择,但有时您需要使用支持突变的类型或算法,在这种情况下,我喜欢使用 MailboxProcessor 来“保护”该东西,使其成为邮箱处理器的私有状态,其他任何人都无法访问它。然后,需要使用它的消费者通过邮箱处理器来使用它,邮箱处理器确保消费者不会互相混乱。

对于你的情况,你可以这样做:

type private QueueMessage<'a> =
| Enqueue of 'a
| Dequeue of AsyncReplyChannel<'a>
| Edit of int * ('a -> unit)

type SafeQueue<'a> () as this =
    let agent = 
        MailboxProcessor<QueueMessage<'a>>.Start 
        <| fun inbox ->
            let rec loop queue =
                async {
                    let! message = inbox.Receive()
                    match message with
                    | Enqueue item -> 
                        let newState = queue.enqueue item
                        return! loop newState
                    | Dequeue channel ->
                        let item = queue.dequeue()
                        channel.Reply item
                        return! loop queue
                    | Edit (idx, f) ->
                        f <| queue.[idx]
                        return! loop queue

                }
            loop <| new Queue<'a>()

    let enqueue item =
        agent.Post <| Enqueue item

    let dequeue () =
        agent.PostAndReply Dequeue

    let edit idx f =
        agent.Post(Edit (idx, f))

    member __.Enqueue item = enqueue item
    member __.Dequeue () = dequeue ()
    member __.Edit (idx, f) = edit idx f

Run Code Online (Sandbox Code Playgroud)