标签: mailboxprocessor

f#中长期运行的代理

我以不同的方式使用代理,其中一个由100个代理监视网站更改,并向主管报告,我可以调用它来生成新的监视器,或者收听合并的更改.这只是我的计划的一部分,我很高兴.我现在想把它拆掉,它真正独立于我的主程序运行.

(但我希望这个独立的衍生产品能够尽可能多地保留在内部,并尽可能使用最少量的胶水代码)

我有什么策略/你会推荐吗?

wcf f# agent mailboxprocessor

4
推荐指数
1
解决办法
331
查看次数

使用带有回复通道的 MailboxProcessor 来创建按顺序返回值的有限代理

基本上,我想将以下内容更改为有限的线程解决方案,因为在我的情况下,计算列表太大,产生了太多线程,我想用更少的线程来试验和测量性能。

// the trivial approach (and largely my current situation)
let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)   // longest thread will run 1 sec
        return i * i                // some complex calculation returning a certain type
        })
    |> Async.Parallel
    |> Async.RunSynchronously       // works, total wall time 1s
Run Code Online (Sandbox Code Playgroud)

我的新方法,这段代码是从 Tomas Petricek这个在线片段借用/启发的(我测试过,它有效,但我需要它返回一个值,而不是单位)。

type LimitAgentMessage = 
  | Start of Async<int> *  AsyncReplyChannel<int>
  | Finished

let threadingLimitAgent limit = …
Run Code Online (Sandbox Code Playgroud)

f# multithreading asynchronous mailboxprocessor

4
推荐指数
1
解决办法
215
查看次数

F#MailboxProcessor问题

我使用http://fssnip.net/3K中的代码创建了一个控制台程序.我发现了

  1. 我将在末尾添加"System.Console.ReadLine()|> ignore"以等待线程的完成.是否有可能告诉所有MailBoxProcessors已完成并且程序可以自行退出?

  2. 我试图将测试网址"www.google.com"更改为无效的网址,我得到了以下输出.是否有可能避免"输出竞赛"?

     http://www.google.co1m crawled by agent 1.  
     AgAAAent gent 3 is done.  
     gent 2 is done.  
     5 is done.  
     gent 4 is done.  
     Agent USupervisor RL collector is done.  
     is done.  
     1 is done.

[编辑]

使用Tomas的更新http://fssnip.net/65后,最后的输出/爬行仍然终止.以下是将"limit"更改为5并添加了一些调试消息后程序的输出.最后一行显示截断的URL.它是一种检测所有爬虫是否完成执行的方法吗?

[Main] before crawl
[Crawl] before return result
http://news.google.com crawled by agent 1.
[supervisor] reached limit
http://www.gstatic.com/news/img/favicon.ico crawled by agent 5.
Agent 2 is done.
[supervisor] reached limit
Agent 5 is done.
http://www.google.com/imghp?hl=en&tab=ni crawled by agent 3.
[supervisor] reached …
Run Code Online (Sandbox Code Playgroud)

f# multithreading asynchronous mailboxprocessor

3
推荐指数
1
解决办法
683
查看次数

为什么我的邮箱处理器挂起了?

我无法弄清楚为什么下面的代码挂在调用GetTotal.我似乎无法在MailboxProcessor内部进行调试,因此很难看到发生了什么.

module Aggregator

open System

type Message<'T, 'TState> =
    | Aggregate of 'T
    | GetTotal of AsyncReplyChannel<'TState>

type Aggregator<'T, 'TState>(initialState, f) =
    let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
        let rec loop agg =
            async {
                let! message = inbox.Receive()
                match message with
                    | Aggregate x -> return! loop (f agg x)
                    | GetTotal replyChannel ->
                        replyChannel.Reply(agg)
                        return! loop agg
            }
        loop initialState
        )

    member m.Aggregate x = myAgent.Post(Aggregate(x))
    member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))

let …
Run Code Online (Sandbox Code Playgroud)

f# mailboxprocessor

3
推荐指数
1
解决办法
316
查看次数

创建只有一些消息得到响应的代理?

是否可以创建仅在有时发布回复的邮箱代理?从它的外观来看,在我看来,如果你想发布回复,你必须总是发送一个异步回复频道.

对于我的用例,我真的希望能够灵活地将一些消息只需要传递给代理,而其他消息我想要获得同步或异步回复.

f# mailboxprocessor

3
推荐指数
1
解决办法
411
查看次数

具有多个递归异步体的F#代理可以在每个中使用多个inbox.Receive()吗?

我在这里有一个多状态F#MailboxProcessor示例,只是想知道它为什么编译但行为是意外的 - F#代理只能在传入的lambda函数中有一个inbox.Receive()语句吗?我试图遵循"专家F#3.0"页面284中提供的一般示例模式,其中使用多个异步{}实体允许多个状态,但是没有具体说明是否可以使用inbox.Receive()在每个异步?

open System

let mb1<'T> = MailboxProcessor<string>.Start(fun inbox ->
                        let rec loop1 (n:int) = async {
                                    printfn "loop1 entry "
                                    let! msg = inbox.Receive()
                                    do! Async.Sleep(1000)
                                    printfn "loop1 calling loop2"  //msg received %A" msg
                                    return! loop2 (n+1) }

                        and loop2 (x:int) =     async {
                                    printfn "loop2 entry"
                                    let! msg2 = inbox.Receive()
                                    printfn "loop2 msg received %A" msg2
                                    printfn "loop2 calling loop1"
                                    return! loop1 (x+1) }
        loop2 0                    
                                        )

mb1.Post("data message 1")
mb1.Post("data message 2")
Run Code Online (Sandbox Code Playgroud)

产量

loop2 entry
loop2 …
Run Code Online (Sandbox Code Playgroud)

f# agents mailboxprocessor

3
推荐指数
1
解决办法
79
查看次数

F#事件在Async工作流中不起作用

我想对代理人进行Post-Fire-Reply.基本上,代理触发事件然后回复调用者.但是,我要么继续收到超时错误,要么事件无法正确触发.我尝试过Post-Fire,它停止了超时错误,但事件没有触发.

let evt = new Event<int>()
let stream = evt.Publish

type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int

let agent = Agent.Start(fun inbox -> 
    let rec loop() = async {
        let! msg = inbox.Receive()
        let (Fire i) = msg
        evt.Trigger i }
    loop())

let on i fn = 
    stream 
    |> Observable.filter (fun x -> x = i) 
    |> Observable.filter (fun x -> x <> 1)
    |> Observable.subscribe (fun x -> fn x) 

let rec collatz n =
    printfn …
Run Code Online (Sandbox Code Playgroud)

events f# asynchronous agents mailboxprocessor

3
推荐指数
1
解决办法
147
查看次数

为什么这个F#代码在与MailboxProcessor一起使用时不会产生预期的输出?

我正在浏览Don Syme的一篇博客文章,其中包括F#:Agents中的Async和Parallel Design Patterns.但是,以下看似非常简单的代码没有按预期生成输出.

type Agent<'T> = MailboxProcessor<'T>

let agent =
   Agent.Start(fun inbox ->
     async { while true do
               let! msg = inbox.Receive()
               printfn "got message '%s'" msg } )

for i in 1 .. 10000 do
   agent.Post (sprintf "message %d" i)
Run Code Online (Sandbox Code Playgroud)

而不是预期的10,000条消息,我只使用Ubuntu下的Mono 2.8.1获得大约3000条消息,或者在Windows XP下使用Visual F#获得15条消息.我在这里错过了什么吗?顺便说一句,我试图用以下文件操作替换printfn语句,最后得到相同的部分结果.

open System.IO
type Agent<'T> = MailboxProcessor<'T>

let agent =
   Agent.Start(fun inbox ->
     async { while true do
               let! msg = inbox.Receive()
               use logger = new StreamWriter("a.log", true)
               logger.WriteLine("got message '{0}'", msg.ToString())
               logger.Close() …
Run Code Online (Sandbox Code Playgroud)

f# asynchronous agent mailboxprocessor

2
推荐指数
1
解决办法
236
查看次数

正确的方法如何使用响应式UI在异步可取消工作流中准备数据

这个问题基于Async.TryCancelled不适用于看起来很复杂的Async.RunSynchronously,因此我将剪切一个我试图解决的简单部分.

假设我有这个功能:

let prepareModel () = 
    async {
        // this might take a lot of time (1-50seconds)
        let! a = ...
        let! b = ...
        let! res = combine a b
        return res
    }
 let updateUI model =
    runOnUIThread model
Run Code Online (Sandbox Code Playgroud)

prepareModel准备应该向用户显示的数据.updateUI刷新UI(删除旧控件并根据新数据创建新的ctl).

问题:如何调用这两个函数以便prepareModel随时可以取消?

流程是

  • 用户点击刷新
    • prepareModel(1)启动并异步运行,因此UI响应迅速,用户可以使用该应用程序
  • 用户更改数据并再次单击刷新
    • prepareModel(1)取消取消,新prepareModel(2)开始
  • 用户更改数据并再次单击刷新
    • prepareModel(2)取消,新prepareModel(3)开始
    • ..
    • prepareModel(n)完成了
    • updateUI 在UI线程上运行,重绘UI

(我的第一个解决方案基于MailboxProcessor确保只prepareModel执行一个,请参阅Async.TryCancelled不能与Async.RunSynchron一起使用,但是当我试验这个时,它不是没有错误的)

user-interface f# asynchronous mailboxprocessor

1
推荐指数
1
解决办法
220
查看次数