Gre*_*bin 2 queue multithreading go
我有一个接收任务并将它们放入通道的函数。每个任务都有 ID、一些属性和一个放置结果的通道。看起来像这样
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
Run Code Online (Sandbox Code Playgroud)
另一个 goroutine 从通道中获取一个任务,对其进行处理并将结果放入任务的通道中
task := <-queue
task.Result <- doExpensiveComputation(task)
Run Code Online (Sandbox Code Playgroud)
这段代码工作正常。但现在我想在queue. 任务处理是一个非常昂贵的操作,所以我想一次处理队列中具有相同 ID 的所有任务。我看到了两种方法。
第一个是不要将具有相同 ID 的任务放入队列,因此当现有任务到达时,它将等待它的副本完成。这是伪代码
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
Run Code Online (Sandbox Code Playgroud)
因此,我可以使用 go channel 和 map 进行随机访问 + 一些同步方式(如互斥锁)来实现它。我不喜欢这种方式的一点是我必须在代码中同时携带地图和频道,并保持它们的内容同步。
第二种方式是将所有任务放入队列,但是当结果到达时从队列中提取任务和所有具有相同ID的任务,然后将结果发送给所有任务。这是伪代码
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
Run Code Online (Sandbox Code Playgroud)
我知道如何以与上述相同的方式使用 chan + map + mutex 来实现这一点。
问题是:是否有一些内置/现有数据结构可以用于解决此类问题?有没有其他(更好的)方法可以做到这一点?
如果我正确理解了这个问题,我想到的最简单的解决方案是在任务发送者(放入queue)和工作人员(从 中取出queue)之间添加一个中间层。这可能是例行程序,负责存储当前任务(按 ID)并将结果广播到每个匹配的任务。
伪代码:
go func() {
active := make(map[TaskID][]Task)
for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()
Run Code Online (Sandbox Code Playgroud)
不需要互斥体,也可能不需要携带任何东西,工作人员只需要将所有结果放入这个中间层,然后正确路由响应。如果冲突的 ID 有可能相隔一段时间到达,您甚至可以轻松地在此处添加缓存。
编辑:我有一个梦想,上面的代码导致了死锁。如果您一次发送大量请求并阻塞worker通道,则存在一个严重的问题——这个中间层例程卡在worker <- task等待工作线程完成,但所有工作线程可能会在发送到响应通道时被阻塞(因为我们的例程可以“收集它)。可玩证明。
可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以将系统设计为缓冲区永远不会填满)。有几种方法可以解决这个问题;例如,您可以运行一个单独的例程来收集响应,但是您需要active使用互斥锁保护映射。可行。您还可以放入worker <- task一个选择中,它会尝试向工作人员发送任务、接收新任务(如果没有要发送的内容)或收集响应。可以利用 nil 通道从未准备好进行通信(被 select 忽略)这一事实,因此您可以在单个 select 中交替接收和发送任务。例子:
go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()
Run Code Online (Sandbox Code Playgroud)