标签: mailboxprocessor

PostAndReply 在已处理的 MailboxProcessor 上

当其 MailboxProcessor 被处理(或以其他方式停止)时,是否可以让 PostAndAsyncReply 立即返回?或者是否有一些关于如何安全地使用 PostAndReply 方法而不造成死锁的“模式”/最佳实践?

现在我遇到的问题是 PostAndAsyncReply 在 MailboxProcessor 被处理后永远不会返回。使用 timeout 参数不是一种选择,因为我迫不及待(此外,选择合理的超时非常困难或不可能,因为它取决于太多因素)。

  [<Test>]
  let ``waiting for a reply from a disposed agent``() =
    use server = MailboxProcessor.Start(fun inbox -> async {
      ()
    })

    (server :> System.IDisposable).Dispose()

    server.PostAndReply (fun reply -> reply) // <- deadlock
    |> ignore)
Run Code Online (Sandbox Code Playgroud)

编辑:我见过的大多数邮箱处理器示例(包括 MSDN 上的示例)甚至不介意处理邮箱处理器。并且 MSDN 没有解释 MailboxProcessors 在被处理时如何反应。没有必要处置它们吗?

f# mailboxprocessor

6
推荐指数
1
解决办法
641
查看次数

如何正确使用F#中的TryScan

我试图找到一个关于如何使用的例子TryScan,但没有找到任何,你能帮助我吗?

我想做什么(非常简单的例子):我有一个MailboxProcessor接受两种类型的消息.

  • 第一个GetState返回当前状态. GetState消息经常发送

  • 另一个UpdateState非常昂贵(耗时) - 例如从互联网下载某些内容然后相应地更新状态. UpdateState被称为很少.

我的问题是 - 消息GetState被阻止并等到前面UpdateState服务.这就是为什么我试图用来TryScan处理所有GetState消息,但没有运气.

我的示例代码:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan" …
Run Code Online (Sandbox Code Playgroud)

concurrency f# messages mailboxprocessor

5
推荐指数
1
解决办法
723
查看次数

使用Post或PostAndAsyncReply与F#的MailboxProcessor?

我已经看到了不同的片段,展示了Put一条unit用F#返回的消息MailboxProcessor.在某些情况下,只有Post方法被使用,而其他人使用PostAndAsyncReply,一旦处理消息,回复通道立即回复.在做一些测试时,我发现在等待回复时有很长的时间滞后,所以看起来除非你需要真正的回复,否则你应该使用Post.

注意:我开始在另一个帖子中询问这个问题,但认为发布完整问题很有用.在另一个帖子中,Tomas Petricek提到回复通道可以使用等待机制来确保调用者延迟直到Put消息被处理.

使用PostAndAsyncReply消息排序的帮助,还是只是强制暂停直到处理第一条消息?在性能方面Post出现了正确的解决方案.那是准确的吗?

更新:

我只想到了为什么PostAndAsyncReplyBlockingQueueAgent示例中可能需要的原因:Scan用于Get在队列已满时查找消息,因此您不希望在之前完成之前Put然后再查找消息.GetPut

concurrency f# actor mailboxprocessor

5
推荐指数
2
解决办法
915
查看次数

统一Task <T>和F#MailboxProcessor异常处理

使用Task <T>时,在Task.Wait()期间抛出任务执行期间的异常; 当使用F#的MailBoxProcessor时,异常会被吞下,需要根据这个问题明确处理.

这种差异使得很难通过Task将F#代理暴露给C#代码.例如,此代理:

type internal IncrementMessage = 
    Increment of int * AsyncReplyChannel<int>

type IncrementAgent() =
    let counter = Agent.Start(fun agent -> 
        let rec loop() = async { let! Increment(msg, replyChannel) = agent.Receive()
                                match msg with 
                                | int.MaxValue -> return! failwith "Boom!"
                                | _ as i -> replyChannel.Reply (i + 1)
                                            return! loop() }

        loop())

    member x.PostAndAsyncReply i =
        Async.StartAsTask (counter.PostAndAsyncReply (fun channel -> Increment(i, channel)))
Run Code Online (Sandbox Code Playgroud)

可以从C#调用,但异常不会返回给C#:

[Test]
public void ExceptionHandling()
{
    //
    // TPL exception behaviour
    // …
Run Code Online (Sandbox Code Playgroud)

f# mailboxprocessor

5
推荐指数
1
解决办法
439
查看次数

使用LIFO逻辑运行的MailboxProcessor

我正在学习F#agents(MailboxProcessor).

我正在处理一个非常传统的问题.

  • 我有一个代理(dataSource),它是流数据的来源.数据必须由一系列代理(dataProcessor)处理.我们可以将其dataProcessor视为某种跟踪设备.
  • 数据的流动速度可能快于dataProcessor可以处理其输入的速度.
  • 有一些延迟是可以的.但是,我必须确保代理人始终处于工作之上,并且不会在过时的观察中被堆积

我正在探索解决这个问题的方法.

一个想法是实现堆栈(LIFO)dataSource.dataSourcedataProcessor可用于接收和处理数据时,将发送最新的观察结果.该解决方案可能有效,但dataProcessor可能需要被阻止和重新激活,因此可能会变得复杂; 并传达其状态dataSource,导致双向沟通问题.这个问题可以归结为一个blocking queue消费者-生产者问题,但我不知道..

第二个想法是有 dataProcessor照顾的消息排序.在这个架构中,dataSource只需在dataProcessor队列中发布更新.dataProcessor将用于Scan获取队列中可用的最新数据.这可能是要走的路.但是,我不确定在当前的设计中MailboxProcessor是否可以清除消息队列,删除旧的过时消息.此外,在这里写道:

不幸的是,当前版本的F#中的TryScan功能有两种方式.首先,重点是指定超时,但实现实际上并不尊重它.具体而言,不相关的消息会重置计时器.其次,与其他扫描功能一样,在锁定下检查消息队列,该锁定防止任何其他线程在扫描期间发布,这可能是任意长的时间.因此,TryScan函数本身往往会锁定并发系统,甚至可能引入死锁,因为调用者的代码是在锁内部进行评估的(例如,当锁定下的代码阻塞时,从函数参数发送到Scan或TryScan会使代理死锁获得已经存在的锁定.

将最新观察结果反弹可能是一个问题.这篇文章的作者@Jon Harrop暗示了这一点

我设法围绕它进行构建,结果架构实际上更好.本质上,我热切地Receive使用我自己的本地队列来消息和过滤所有消息.

这个想法肯定值得探索,但在开始使用代码之前,我会欢迎一些关于如何构建解决方案的输入.

谢谢.

f# asynchronous agent mailboxprocessor lifo

5
推荐指数
1
解决办法
559
查看次数

通过Rx从MailboxProcessor返回结果是一个好主意吗?

我对下面的代码示例以及人们的想法有点好奇.我们的想法是从NetworkStream(~20 msg/s)读取而不是在main中工作,将事物传递给MainboxProcessor以便在完成时处理并返回绑定.

通常的方法是使用PostAndReply,但我想绑定到ListView或C#中的其他控件.必须使用LastN项目进行魔术并进行过滤.另外,Rx有一些错误处理.

下面的例子观察2..10的数字并返回"你好X".在8它停止像EOF.使其成为ToEnumerable,因为其他线程在完成之前完成,但它也适用于Subscribe.

困扰我的是什么:

  1. 在递归中传递Subject(obj).我认为其中大约有3-4个没有任何问题.好主意?
  2. 主体的一生.

open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq  // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency

type SerializedLogger() = 

    let _letters = new Subject<string>()
    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop (letters:Subject<string>) = async{

            // read a message
            let! msg = inbox.Receive()

            printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
            do! Async.Sleep 100
            // write it to the …
Run Code Online (Sandbox Code Playgroud)

f# system.reactive mailboxprocessor

5
推荐指数
1
解决办法
245
查看次数

f#mailboxprocessor - 无需等待传递即可回复

我正在使用代理(MailboxProcessor)来进行需要响应的状态处理.

  • 呼叫者使用发布消息 MailboxProcessor.PostAndAsyncReply
  • 在代理内部,给出响应 AsyncReplyChannel.Reply

但是,我发现通过探索f#源代码,代理在响应传递之前不会处理下一条消息.总的来说这是件好事.但在我的情况下,代理更希望继续处理消息而不是等待响应传递.

做这样的事情以提供响应是否有问题?(或者有更好的选择吗?)

async { replyChannel.Reply response } |> Async.Start
Run Code Online (Sandbox Code Playgroud)

我意识到这种方法并不能保证响应按顺序传递.我没关系.

参考例子

// agent code
let doWork data =
    async { ... ; return response }

let rec loop ( inbox : MailboxProcessor<_> ) =
    async {
        let! msg = inbox.Receive()
        match msg with
        | None ->
            return ()

        | Some ( data, replyChannel ) ->
            let! response = doWork data
            replyChannel.Reply response (* waits for delivery, vs below *)
            // async { replyChannel.Reply response } …
Run Code Online (Sandbox Code Playgroud)

f# agent mailboxprocessor

5
推荐指数
1
解决办法
737
查看次数

等待邮箱处理器

是否可以等待邮箱处理器,以下代码在 F# 交互式中工作,但有没有办法在应用程序或单元测试中等待它?

[<TestMethod>]
member this.TestMailboxProcessor() =
    let mailboxProcessor = MailboxProcessor<string>.Start(fun inbox ->
        async {
            while true do
            let! msg = inbox.Receive()
            printfn "agent got message %s" msg // too late, UnitTest exits
        }
    )

    mailboxProcessor.Post "ping"
    Console.WriteLine "message posted" // I see this in the console
    Assert.IsTrue(true)
Run Code Online (Sandbox Code Playgroud)

f# mailboxprocessor

5
推荐指数
2
解决办法
604
查看次数

F# 中的事件、观察者和 MailboxProcessor 的澄清

我有一个与金融市场相连的系统,它大量使用事件。

所有代码都被构造为事件级联,其间有过滤器、聚合等。

最初系统是用 C# 编写的,然后移植到 F#(回想起来这是一个伟大的举动),C# 代码中的事件被 F# 中的事件取代,没有多加考虑。

我听说过观察者模式,但我还没有真正了解这个主题。最近,我通过一些随机浏览阅读了有关 F# 的邮箱处理器的信息。

我读过这篇文章:观察者模式与事件驱动方法之间的差异,我没有明白,但显然超过 150 人投票认为答案也不太清楚:)

在这样的文章中:https://hackernoon.com/observer-vs-pub-sub-pattern-50d3b27f838c看起来观察者模式与事件完全相同......

乍一看,他们似乎在解决同一类问题,只是接口不同,但这让我思考了两个问题:

  • 邮箱处理器真的是被使用的东西吗?它似乎主要出现在较旧的文档中,并且在我正在使用的软件包中,我还没有遇到任何使用它的情况

  • 关于观察者模式,我们使用的大量包中只有一个包在内部使用它,但其他所有东西都只是使用基本事件。

是否有适合 Observable 模式和 MailboxProcessor 的特定用例?他们有独特的功能吗?或者它们最终只是围绕事件提供语法帮助?

f# observable mailboxprocessor

5
推荐指数
1
解决办法
442
查看次数

F#MailboxProcessor限制并行性

我是F#的新手并尝试使用MailboxProcessor来确保状态更改是孤立完成的.

简而言之,我将动作(描述状态通道的不可变对象)发布到MailboxProcessor,在递归函数中我读取消息并生成新状态(即在下面的示例中将项添加到集合中)并将该状态发送到下一次递归.

open System

type AppliationState =
    {
        Store : string list
    }
    static member Default = 
        {
            Store = List.empty
        }
    member this.HandleAction (action:obj) =
        match action with
        | :? string as a -> { this with Store = a :: this.Store }
        | _ -> this

type Agent<'T> = MailboxProcessor<'T>     

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive() …
Run Code Online (Sandbox Code Playgroud)

f# multithreading agent mailboxprocessor

4
推荐指数
2
解决办法
229
查看次数