我正在寻找一个长期运行服务(使用F#)的内存泄漏.到目前为止,我见过的唯一"奇怪"的事情如下:

我在代码中看不到任何这种行为的原因(你可以在每个邮箱示例中找到你的标准代码 - 只有一个带有a let! = receive和a 的循环match结束了return! loop()
有没有人见过这种行为,甚至知道如何处理这个?或者这甚至是一个(已知的)错误?
更新:数组的增长真的很奇怪 - 似乎附加了额外的空间而没有正确使用:

我想知道,为什么MailboxProcessor处理异常的默认策略只是默默地忽略它们.例如:
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop() =
async { printfn "waiting for data..."
let! data = inbox.Receive()
failwith "fail" // simulate throwing of an exception
printfn "Got: %d" data
return! loop()
}
loop ())
()
counter.Post(42)
counter.Post(43)
counter.Post(44)
Async.Sleep 1000 |> Async.RunSynchronously
Run Code Online (Sandbox Code Playgroud)
没有任何反应.程序执行没有致命的停止,或者出现带有"未处理的异常"的消息框.没有.
如果有人使用PostAndReply方法,这种情况会变得更糟:结果是保证死锁.
这种行为有什么理由吗?
您是否尝试过使用C#的T的MailboxProcessor?你能发贴示例代码吗?
你如何开始一个新的,发布消息,以及如何处理它们?
看着:
member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚为什么签名看起来对我来说非常直观.我们想要做的是向代理发布消息,然后等待回复.为什么我们必须给他一个奇怪的功能作为'信息'?
再次看到这个MSDN片段:
let rec loop() =
printf "> "
let input = Console.ReadLine()
printThreadId("Console loop")
let reply = agent.PostAndReply(fun replyChannel -> input, replyChannel)
if (reply <> "Stopping.") then
printfn "Reply: %s" reply
loop()
else
()
loop()
Run Code Online (Sandbox Code Playgroud)
我更喜欢这样的东西:
member this.PostAndReply : 'Msg * ?int -> 'Reply
Run Code Online (Sandbox Code Playgroud)
谢谢
我正在使用MailboxProcessor类来保持独立的代理人做自己的事情.通常,代理可以在同一个进程中相互通信,但我希望代理在处于不同的进程或甚至不同的机器上时能够相互通信.什么样的机制最适合实现它们之间的通信?有一些标准的解决方案吗?
请注意,我正在使用Ubuntu实例来运行代理.
我有一个代理,我设置在后台做一些数据库工作.实现看起来像这样:
let myAgent = MailboxProcessor<AgentData>.Start(fun inbox ->
let rec loop =
async {
let! data = inbox.Receive()
use conn = new System.Data.SqlClient.SqlConnection("...")
data |> List.map (fun e -> // Some transforms)
|> List.sortBy (fun (_,_,t,_,_) -> t)
|> List.iter (fun (a,b,c,d,e) ->
try
... // Do the database work
with e -> Log.error "Yikes")
return! loop
}
loop)
Run Code Online (Sandbox Code Playgroud)
有了这个,我发现如果在一段时间内多次调用它,我会开始使SqlConnection对象堆积而不是被丢弃,最终我会在连接池中用尽连接(我没有确切的指标)有多少"几个",但连续两次运行集成测试套件总是会导致连接池运行干燥).
如果我use改为a,using则事情处理得当,我没有问题:
let myAgent = MailboxProcessor<AgentData>.Start(fun inbox ->
let rec loop =
async {
let! data = inbox.Receive() …Run Code Online (Sandbox Code Playgroud) 我试图在F#中测试一个MailboxProcessor.我想测试我发布的函数f实际上是在发布消息时执行的.
原始代码使用的是Xunit,但我创建了一个fsx,我可以使用fsharpi执行它.
到目前为止我这样做:
open System
open FSharp
open System.Threading
open System.Threading.Tasks
module MyModule =
type Agent<'a> = MailboxProcessor<'a>
let waitingFor timeOut (v:'a)=
let cts = new CancellationTokenSource(timeOut|> int)
let tcs = new TaskCompletionSource<'a>()
cts.Token.Register(fun (_) -> tcs.SetCanceled()) |> ignore
tcs ,Async.AwaitTask tcs.Task
type MyProcessor<'a>(f:'a->unit) =
let agent = Agent<'a>.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
// some more complex should be used here
f msg
return! loop()
}
loop()
)
member this.Post(msg:'a) =
agent.Post …Run Code Online (Sandbox Code Playgroud) 如果状态被认为是函数的坏主意,为什么在使用MailboxProcessor时它被视为没有状态?
为了扩展,我向某人解释函数式编程,函数如何不使用状态(函数外部没有变量 - 即相同数据的相同数据)以及这带来的好处.但后来我开始考虑使用MailboxProcessor以及它使用递归来在函数调用之间保持状态的方式,而且我不能完全调和为什么在这种情况下它是可以的.
这是持久状态最不好的方式吗?
我试图把它减少到尽可能小的重复,但它仍然有点长,我道歉.
我有一个F#项目引用一个C#项目,代码如下所示.
public static class CSharpClass {
public static async Task AsyncMethod(CancellationToken cancellationToken) {
await Task.Delay(3000);
cancellationToken.ThrowIfCancellationRequested();
}
}
Run Code Online (Sandbox Code Playgroud)
这是F#代码.
type Message =
| Work of CancellationToken
| Quit of AsyncReplyChannel<unit>
let mkAgent() = MailboxProcessor.Start <| fun inbox ->
let rec loop() = async {
let! msg = inbox.TryReceive(250)
match msg with
| Some (Work cancellationToken) ->
let! result =
CSharpClass.AsyncMethod(cancellationToken)
|> Async.AwaitTask
|> Async.Catch
// THIS POINT IS NEVER REACHED AFTER CANCELLATION
match result with
| Choice1Of2 _ -> …Run Code Online (Sandbox Code Playgroud) 当其 MailboxProcessor 被处理(或以其他方式停止)时,是否可以让 PostAndAsyncReply 立即返回?或者是否有一些关于如何安全地使用 PostAndReply 方法而不造成死锁的“模式”/最佳实践?
现在我遇到的问题是 PostAndAsyncReply 在 MailboxProcessor 被处理后永远不会返回。使用 timeout 参数不是一种选择,因为我迫不及待(此外,选择合理的超时非常困难或不可能,因为它取决于太多因素)。
[<Test>]
let ``waiting for a reply from a disposed agent``() =
use server = MailboxProcessor.Start(fun inbox -> async {
()
})
(server :> System.IDisposable).Dispose()
server.PostAndReply (fun reply -> reply) // <- deadlock
|> ignore)
Run Code Online (Sandbox Code Playgroud)
编辑:我见过的大多数邮箱处理器示例(包括 MSDN 上的示例)甚至不介意处理邮箱处理器。并且 MSDN 没有解释 MailboxProcessors 在被处理时如何反应。没有必要处置它们吗?
f# ×10
mailboxprocessor ×10
asynchronous ×2
agent ×1
agents ×1
c#-4.0 ×1
c#-to-f# ×1
cancellation ×1
concurrency ×1
memory-leaks ×1
messaging ×1
task ×1
unit-testing ×1