我以不同的方式使用代理,其中一个由100个代理监视网站更改,并向主管报告,我可以调用它来生成新的监视器,或者收听合并的更改.这只是我的计划的一部分,我很高兴.我现在想把它拆掉,它真正独立于我的主程序运行.
(但我希望这个独立的衍生产品能够尽可能多地保留在内部,并尽可能使用最少量的胶水代码)
我有什么策略/你会推荐吗?
基本上,我想将以下内容更改为有限的线程解决方案,因为在我的情况下,计算列表太大,产生了太多线程,我想用更少的线程来试验和测量性能。
// the trivial approach (and largely my current situation)
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> Async.Parallel
|> Async.RunSynchronously // works, total wall time 1s
Run Code Online (Sandbox Code Playgroud)
我的新方法,这段代码是从 Tomas Petricek的这个在线片段借用/启发的(我测试过,它有效,但我需要它返回一个值,而不是单位)。
type LimitAgentMessage =
| Start of Async<int> * AsyncReplyChannel<int>
| Finished
let threadingLimitAgent limit = …Run Code Online (Sandbox Code Playgroud) 我使用http://fssnip.net/3K中的代码创建了一个控制台程序.我发现了
我将在末尾添加"System.Console.ReadLine()|> ignore"以等待线程的完成.是否有可能告诉所有MailBoxProcessors已完成并且程序可以自行退出?
我试图将测试网址"www.google.com"更改为无效的网址,我得到了以下输出.是否有可能避免"输出竞赛"?
http://www.google.co1m crawled by agent 1.
AgAAAent gent 3 is done.
gent 2 is done.
5 is done.
gent 4 is done.
Agent USupervisor RL collector is done.
is done.
1 is done.
使用Tomas的更新http://fssnip.net/65后,最后的输出/爬行仍然终止.以下是将"limit"更改为5并添加了一些调试消息后程序的输出.最后一行显示截断的URL.它是一种检测所有爬虫是否完成执行的方法吗?
[Main] before crawl
[Crawl] before return result
http://news.google.com crawled by agent 1.
[supervisor] reached limit
http://www.gstatic.com/news/img/favicon.ico crawled by agent 5.
Agent 2 is done.
[supervisor] reached limit
Agent 5 is done.
http://www.google.com/imghp?hl=en&tab=ni crawled by agent 3.
[supervisor] reached …Run Code Online (Sandbox Code Playgroud) 我无法弄清楚为什么下面的代码挂在调用GetTotal.我似乎无法在MailboxProcessor内部进行调试,因此很难看到发生了什么.
module Aggregator
open System
type Message<'T, 'TState> =
| Aggregate of 'T
| GetTotal of AsyncReplyChannel<'TState>
type Aggregator<'T, 'TState>(initialState, f) =
let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
let rec loop agg =
async {
let! message = inbox.Receive()
match message with
| Aggregate x -> return! loop (f agg x)
| GetTotal replyChannel ->
replyChannel.Reply(agg)
return! loop agg
}
loop initialState
)
member m.Aggregate x = myAgent.Post(Aggregate(x))
member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))
let …Run Code Online (Sandbox Code Playgroud) 是否可以创建仅在有时发布回复的邮箱代理?从它的外观来看,在我看来,如果你想发布回复,你必须总是发送一个异步回复频道.
对于我的用例,我真的希望能够灵活地将一些消息只需要传递给代理,而其他消息我想要获得同步或异步回复.
我在这里有一个多状态F#MailboxProcessor示例,只是想知道它为什么编译但行为是意外的 - F#代理只能在传入的lambda函数中有一个inbox.Receive()语句吗?我试图遵循"专家F#3.0"页面284中提供的一般示例模式,其中使用多个异步{}实体允许多个状态,但是没有具体说明是否可以使用inbox.Receive()在每个异步?
open System
let mb1<'T> = MailboxProcessor<string>.Start(fun inbox ->
let rec loop1 (n:int) = async {
printfn "loop1 entry "
let! msg = inbox.Receive()
do! Async.Sleep(1000)
printfn "loop1 calling loop2" //msg received %A" msg
return! loop2 (n+1) }
and loop2 (x:int) = async {
printfn "loop2 entry"
let! msg2 = inbox.Receive()
printfn "loop2 msg received %A" msg2
printfn "loop2 calling loop1"
return! loop1 (x+1) }
loop2 0
)
mb1.Post("data message 1")
mb1.Post("data message 2")
Run Code Online (Sandbox Code Playgroud)
产量
loop2 entry
loop2 …Run Code Online (Sandbox Code Playgroud) 我想对代理人进行Post-Fire-Reply.基本上,代理触发事件然后回复调用者.但是,我要么继续收到超时错误,要么事件无法正确触发.我尝试过Post-Fire,它停止了超时错误,但事件没有触发.
let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i }
loop())
let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)
let rec collatz n =
printfn …Run Code Online (Sandbox Code Playgroud) 我正在浏览Don Syme的一篇博客文章,其中包括F#:Agents中的Async和Parallel Design Patterns.但是,以下看似非常简单的代码没有按预期生成输出.
type Agent<'T> = MailboxProcessor<'T>
let agent =
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
printfn "got message '%s'" msg } )
for i in 1 .. 10000 do
agent.Post (sprintf "message %d" i)
Run Code Online (Sandbox Code Playgroud)
而不是预期的10,000条消息,我只使用Ubuntu下的Mono 2.8.1获得大约3000条消息,或者在Windows XP下使用Visual F#获得15条消息.我在这里错过了什么吗?顺便说一句,我试图用以下文件操作替换printfn语句,最后得到相同的部分结果.
open System.IO
type Agent<'T> = MailboxProcessor<'T>
let agent =
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
use logger = new StreamWriter("a.log", true)
logger.WriteLine("got message '{0}'", msg.ToString())
logger.Close() …Run Code Online (Sandbox Code Playgroud) 这个问题基于Async.TryCancelled不适用于看起来很复杂的Async.RunSynchronously,因此我将剪切一个我试图解决的简单部分.
假设我有这个功能:
let prepareModel () =
async {
// this might take a lot of time (1-50seconds)
let! a = ...
let! b = ...
let! res = combine a b
return res
}
let updateUI model =
runOnUIThread model
Run Code Online (Sandbox Code Playgroud)
prepareModel准备应该向用户显示的数据.updateUI刷新UI(删除旧控件并根据新数据创建新的ctl).
问题:如何调用这两个函数以便prepareModel随时可以取消?
流程是
prepareModel(1)启动并异步运行,因此UI响应迅速,用户可以使用该应用程序prepareModel(1)取消取消,新prepareModel(2)开始prepareModel(2)取消,新prepareModel(3)开始prepareModel(n)完成了updateUI 在UI线程上运行,重绘UI(我的第一个解决方案基于MailboxProcessor确保只prepareModel执行一个,请参阅Async.TryCancelled不能与Async.RunSynchron一起使用,但是当我试验这个时,它不是没有错误的)