luk*_*ler 6 .net c# system.reactive
比方说,我有1000个观察者.现在我想将所有事件聚合到一个新的observable中,一旦所有其他事件发送了一个事件,它就会触发OnNext.什么是使用Rx做到这一点的最佳方法?
更新:在Rx论坛上有一些很棒的反馈,尤其是Dave Sexton.他展示了如何创建一个带有多个observable的Zip扩展方法:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/
F# 中有一个 MailboxProcessor...我会在 C# 中使用 SynchronizationContext 来达到相同的目的。给我几分钟,我会写一个例子。
旁白:这是我在 F# 中的代码,它执行类似的操作...这将花费更多的精力,但在带有 Rx 的 C# 中仍然可行。
open System.Diagnostics
let numWorkers = 20
let asyncDelay = 100
type MessageForMailbox =
| DataMessage of AsyncReplyChannel<unit>
| GetSummary of AsyncReplyChannel<unit>
let main =
let actor =
MailboxProcessor.Start( fun inbox ->
let rec loop acc =
async {
let! message = inbox.Receive()
match message with
| DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
| GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
}
loop 0 // seed for acc
)
let codeBlocks = [for i in 1..numWorkers ->
async {
do! Async.Sleep asyncDelay
return! actor.PostAndAsyncReply DataMessage
} ]
while true do
printfn "Concurrent started..."
let sw = new Stopwatch()
sw.Start()
codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
actor.PostAndReply GetSummary
sw.Stop()
printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)
printfn "Synchronous started..."
let sw = new Stopwatch()
sw.Start()
for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
sw.Stop()
printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)
main
Run Code Online (Sandbox Code Playgroud)