分配工作量

Max*_*Max 2 f# asynchronous

在我的应用程序中,我有许多对象可以执行一些持久的计算,让我们称之为客户端.我还有许多对象,其中包含要计算的任务的描述.像这样的东西:

let clients = [1..4]
let tasks = [1..20]

let calculate c t =
    printf "Starting task %d with client %d\n" t c
    Thread.Sleep(3000)
    printf "Finished task %d with client %d\n" t c
Run Code Online (Sandbox Code Playgroud)

有了一个客户端,我一次只能启动一个任务.我想创建一个函数/类,它将任务分配给客户端并执行计算.我在C#中使用客户端队列完成了这一操作,因此只要将新任务分配给客户端,就会从队列中删除此客户端,并在计算完成后释放客户端并将其放入队列中再次.现在我有兴趣以功能的方式实现它.我试图尝试异步工作流程,但我想不出一个正确的方法来实现它.

这是一个类似F#的代码,我试图让它工作,但不能:

let rec distribute clients tasks calculate tasks_to_wait =
    match clients, tasks with
    | _ , [] -> ()           // No tasks - we're done!
    | [], th::tt ->          // No free clients, but there are still tasks to calculate.
             let released_client = ?wait_for_one? tasks_to_wait 
             distribute [released_client] tasks calculate ?tasks_to_wait?

    | ch::ct, th::tt ->      // There are free clients. 
        let task_to_wait() =
             do calculate ch th 
             ch
        distribute ct tt calculate (task_to_wait::tasks_to_wait)
Run Code Online (Sandbox Code Playgroud)

我该怎么做呢?是否有功能设计模式来解决这个任务?

Tom*_*cek 6

有多种方法可以做到这一点.使用来自F#的.NET 4.0的一些并发集合(如队列)是完全可以的,因为如果集合已经实现了您需要的功能,这通常是最容易的事情.

该问题需要并发访问某些资源,因此无法以纯粹的功能方式解决,但F#提供了代理,这为您提供了解决问题的一种不错的替代方法.

以下代码段实现了一个用于计划工作项的代理.它使用自己的邮箱来保留可用的客户端(这为您提供了一种等待下一个可用客户端的好方法).创建代理后,您只需发送所有初始客户端即可.当客户端可用时,它将继续迭代任务.当没有可用的客户端时,它将阻塞(异步 - 不阻塞线程),直到某些先前的处理完成并且客户端被发送回代理的邮箱:

let workloadAgent = MailboxProcessor.Start(fun agent -> 
  // The body of the agent, runs as a recursive loop that iterates
  // over the tasks and waits for client before it starts processing
  let rec loop tasks = async {
    match tasks with 
    | [] -> 
        // No more work to schedule (but there may be still calculation,
        // which was started earlier, going on in the background)
        ()
    | task::tasks ->
        // Wait for the next client to become available by receiving the
        // next message from the inbox - the messages are clients
        let! client = agent.Receive()
        // Spanw processing of the task using the client
        async { calculate client task
                // When the processing completes, send the client
                // back to the agent, so that it can be reused
                agent.Post(client) }
        |> Async.Start 
        // Continue processing the rest of the tasks
        return! loop tasks }

  // Start the agent with the initial list of tasks
  loop tasks )

// Add all clients to the agent, so that it can start
for client in clients do workloadAgent.Post(client)
Run Code Online (Sandbox Code Playgroud)

如果您不熟悉F#代理,那么MSDN部分的服务器端编程有一些有用的信息.