当其 MailboxProcessor 被处理(或以其他方式停止)时,是否可以让 PostAndAsyncReply 立即返回?或者是否有一些关于如何安全地使用 PostAndReply 方法而不造成死锁的“模式”/最佳实践?
现在我遇到的问题是 PostAndAsyncReply 在 MailboxProcessor 被处理后永远不会返回。使用 timeout 参数不是一种选择,因为我迫不及待(此外,选择合理的超时非常困难或不可能,因为它取决于太多因素)。
[<Test>]
let ``waiting for a reply from a disposed agent``() =
use server = MailboxProcessor.Start(fun inbox -> async {
()
})
(server :> System.IDisposable).Dispose()
server.PostAndReply (fun reply -> reply) // <- deadlock
|> ignore)
Run Code Online (Sandbox Code Playgroud)
编辑:我见过的大多数邮箱处理器示例(包括 MSDN 上的示例)甚至不介意处理邮箱处理器。并且 MSDN 没有解释 MailboxProcessors 在被处理时如何反应。没有必要处置它们吗?
我试图找到一个关于如何使用的例子TryScan,但没有找到任何,你能帮助我吗?
我想做什么(非常简单的例子):我有一个MailboxProcessor接受两种类型的消息.
第一个GetState返回当前状态.
GetState消息经常发送
另一个UpdateState非常昂贵(耗时) - 例如从互联网下载某些内容然后相应地更新状态.
UpdateState被称为很少.
我的问题是 - 消息GetState被阻止并等到前面UpdateState服务.这就是为什么我试图用来TryScan处理所有GetState消息,但没有运气.
我的示例代码:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
mbox.TryScan(fun m ->
match m with
| GetState(chnl) ->
printfn "G processing TryScan" …Run Code Online (Sandbox Code Playgroud) 我已经看到了不同的片段,展示了Put一条unit用F#返回的消息MailboxProcessor.在某些情况下,只有Post方法被使用,而其他人使用PostAndAsyncReply,一旦处理消息,回复通道立即回复.在做一些测试时,我发现在等待回复时有很长的时间滞后,所以看起来除非你需要真正的回复,否则你应该使用Post.
注意:我开始在另一个帖子中询问这个问题,但认为发布完整问题很有用.在另一个帖子中,Tomas Petricek提到回复通道可以使用等待机制来确保调用者延迟直到Put消息被处理.
使用PostAndAsyncReply消息排序的帮助,还是只是强制暂停直到处理第一条消息?在性能方面Post出现了正确的解决方案.那是准确的吗?
更新:
我只想到了为什么PostAndAsyncReply在BlockingQueueAgent示例中可能需要的原因:Scan用于Get在队列已满时查找消息,因此您不希望在之前完成之前Put然后再查找消息.GetPut
使用Task <T>时,在Task.Wait()期间抛出任务执行期间的异常; 当使用F#的MailBoxProcessor时,异常会被吞下,需要根据这个问题明确处理.
这种差异使得很难通过Task将F#代理暴露给C#代码.例如,此代理:
type internal IncrementMessage =
Increment of int * AsyncReplyChannel<int>
type IncrementAgent() =
let counter = Agent.Start(fun agent ->
let rec loop() = async { let! Increment(msg, replyChannel) = agent.Receive()
match msg with
| int.MaxValue -> return! failwith "Boom!"
| _ as i -> replyChannel.Reply (i + 1)
return! loop() }
loop())
member x.PostAndAsyncReply i =
Async.StartAsTask (counter.PostAndAsyncReply (fun channel -> Increment(i, channel)))
Run Code Online (Sandbox Code Playgroud)
可以从C#调用,但异常不会返回给C#:
[Test]
public void ExceptionHandling()
{
//
// TPL exception behaviour
// …Run Code Online (Sandbox Code Playgroud) 我正在学习F#agents(MailboxProcessor).
我正在处理一个非常传统的问题.
dataSource),它是流数据的来源.数据必须由一系列代理(dataProcessor)处理.我们可以将其dataProcessor视为某种跟踪设备.dataProcessor可以处理其输入的速度.我正在探索解决这个问题的方法.
第一个想法是实现堆栈(LIFO)dataSource.dataSource当dataProcessor可用于接收和处理数据时,将发送最新的观察结果.该解决方案可能有效,但dataProcessor可能需要被阻止和重新激活,因此可能会变得复杂; 并传达其状态dataSource,导致双向沟通问题.这个问题可以归结为一个blocking queue在消费者-生产者问题,但我不知道..
在第二个想法是有 dataProcessor照顾的消息排序.在这个架构中,dataSource只需在dataProcessor队列中发布更新.dataProcessor将用于Scan获取队列中可用的最新数据.这可能是要走的路.但是,我不确定在当前的设计中MailboxProcessor是否可以清除消息队列,删除旧的过时消息.此外,在这里写道:
不幸的是,当前版本的F#中的TryScan功能有两种方式.首先,重点是指定超时,但实现实际上并不尊重它.具体而言,不相关的消息会重置计时器.其次,与其他扫描功能一样,在锁定下检查消息队列,该锁定防止任何其他线程在扫描期间发布,这可能是任意长的时间.因此,TryScan函数本身往往会锁定并发系统,甚至可能引入死锁,因为调用者的代码是在锁内部进行评估的(例如,当锁定下的代码阻塞时,从函数参数发送到Scan或TryScan会使代理死锁获得已经存在的锁定.
将最新观察结果反弹可能是一个问题.这篇文章的作者@Jon Harrop暗示了这一点
我设法围绕它进行构建,结果架构实际上更好.本质上,我热切地
Receive使用我自己的本地队列来消息和过滤所有消息.
这个想法肯定值得探索,但在开始使用代码之前,我会欢迎一些关于如何构建解决方案的输入.
谢谢.
我对下面的代码示例以及人们的想法有点好奇.我们的想法是从NetworkStream(~20 msg/s)读取而不是在main中工作,将事物传递给MainboxProcessor以便在完成时处理并返回绑定.
通常的方法是使用PostAndReply,但我想绑定到ListView或C#中的其他控件.必须使用LastN项目进行魔术并进行过滤.另外,Rx有一些错误处理.
下面的例子观察2..10的数字并返回"你好X".在8它停止像EOF.使其成为ToEnumerable,因为其他线程在完成之前完成,但它也适用于Subscribe.
困扰我的是什么:
open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency
type SerializedLogger() =
let _letters = new Subject<string>()
// create the mailbox processor
let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop (letters:Subject<string>) = async{
// read a message
let! msg = inbox.Receive()
printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
do! Async.Sleep 100
// write it to the …Run Code Online (Sandbox Code Playgroud) 我正在使用代理(MailboxProcessor)来进行需要响应的状态处理.
MailboxProcessor.PostAndAsyncReplyAsyncReplyChannel.Reply但是,我发现通过探索f#源代码,代理在响应传递之前不会处理下一条消息.总的来说这是件好事.但在我的情况下,代理更希望继续处理消息而不是等待响应传递.
做这样的事情以提供响应是否有问题?(或者有更好的选择吗?)
async { replyChannel.Reply response } |> Async.Start
Run Code Online (Sandbox Code Playgroud)
我意识到这种方法并不能保证响应按顺序传递.我没关系.
参考例子
// agent code
let doWork data =
async { ... ; return response }
let rec loop ( inbox : MailboxProcessor<_> ) =
async {
let! msg = inbox.Receive()
match msg with
| None ->
return ()
| Some ( data, replyChannel ) ->
let! response = doWork data
replyChannel.Reply response (* waits for delivery, vs below *)
// async { replyChannel.Reply response } …Run Code Online (Sandbox Code Playgroud) 是否可以等待邮箱处理器,以下代码在 F# 交互式中工作,但有没有办法在应用程序或单元测试中等待它?
[<TestMethod>]
member this.TestMailboxProcessor() =
let mailboxProcessor = MailboxProcessor<string>.Start(fun inbox ->
async {
while true do
let! msg = inbox.Receive()
printfn "agent got message %s" msg // too late, UnitTest exits
}
)
mailboxProcessor.Post "ping"
Console.WriteLine "message posted" // I see this in the console
Assert.IsTrue(true)
Run Code Online (Sandbox Code Playgroud) 我有一个与金融市场相连的系统,它大量使用事件。
所有代码都被构造为事件级联,其间有过滤器、聚合等。
最初系统是用 C# 编写的,然后移植到 F#(回想起来这是一个伟大的举动),C# 代码中的事件被 F# 中的事件取代,没有多加考虑。
我听说过观察者模式,但我还没有真正了解这个主题。最近,我通过一些随机浏览阅读了有关 F# 的邮箱处理器的信息。
我读过这篇文章:观察者模式与事件驱动方法之间的差异,我没有明白,但显然超过 150 人投票认为答案也不太清楚:)
在这样的文章中:https://hackernoon.com/observer-vs-pub-sub-pattern-50d3b27f838c看起来观察者模式与事件完全相同......
乍一看,他们似乎在解决同一类问题,只是接口不同,但这让我思考了两个问题:
邮箱处理器真的是被使用的东西吗?它似乎主要出现在较旧的文档中,并且在我正在使用的软件包中,我还没有遇到任何使用它的情况
关于观察者模式,我们使用的大量包中只有一个包在内部使用它,但其他所有东西都只是使用基本事件。
是否有适合 Observable 模式和 MailboxProcessor 的特定用例?他们有独特的功能吗?或者它们最终只是围绕事件提供语法帮助?
我是F#的新手并尝试使用MailboxProcessor来确保状态更改是孤立完成的.
简而言之,我将动作(描述状态通道的不可变对象)发布到MailboxProcessor,在递归函数中我读取消息并生成新状态(即在下面的示例中将项添加到集合中)并将该状态发送到下一次递归.
open System
type AppliationState =
{
Store : string list
}
static member Default =
{
Store = List.empty
}
member this.HandleAction (action:obj) =
match action with
| :? string as a -> { this with Store = a :: this.Store }
| _ -> this
type Agent<'T> = MailboxProcessor<'T>
[<AbstractClass; Sealed>]
type AppHolder private () =
static member private Processor = Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive() …Run Code Online (Sandbox Code Playgroud) f# ×10
mailboxprocessor ×10
agent ×3
concurrency ×2
actor ×1
asynchronous ×1
lifo ×1
messages ×1
observable ×1