Eng*_*r81 5 concurrency channel go goroutine
我们有一个流程,用户可以在该流程中请求从源中获取文件。该来源不是最可靠的,因此我们使用Amazon SQS实现了队列。我们将下载URL放入队列,然后使用在Go中编写的一个小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的S3。完成所有这些操作后,它将回调服务,该服务将通过电子邮件向用户发送电子邮件,告知他们文件已准备就绪。
最初,我编写此代码是为了创建n个通道,然后为每个通道附加1个例行程序,并使该例行程序处于无限循环中。这样,我可以确保一次只能处理固定数量的下载。
我意识到这不是应该使用通道的方式,如果我现在正确理解的话,实际上应该有一个通道在该通道上接收n个例程。每个go例程都处于无限循环中,等待一条消息,当它收到消息时,它将处理数据,执行应有的所有操作,完成后将等待下一条消息。这样可以确保我一次只处理n个文件。我认为这是正确的方法。我相信这是扇出,对不对?
我并不需要做的,是要合并这些进程重新走到一起。下载完成后,它将回调远程服务,以便处理剩余的过程。该应用程序无需执行其他任何操作。
好的,所以一些代码:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
Run Code Online (Sandbox Code Playgroud)
我在附近任何地方吗?我有n个正在运行的go例程(其中MAX_CONCURRENT_ROUTINES= n),在循环中,我们将继续将消息传递到单个通道中。这是正确的方法吗?我需要关闭任何东西还是可以无限期地运行它?
我注意到的一件事是SQS正在返回消息,但是一旦我将10条消息传递进去processMessage()(其中10条是通道缓冲区的大小),实际上就不会再处理任何消息了。
谢谢大家
看起来不错。一些注意事项:
除了限制生成的工作例程数量之外,您还可以通过其他方式限制工作并行性。例如,您可以为收到的每条消息创建一个 goroutine,然后让生成的 goroutine 等待限制并行性的信号量。当然,存在权衡,但您不仅限于所描述的方式。
sem := make(chan struct{}, n)
work := func(m sqs.Message) {
sem <- struct{}{} // When there's room we can proceed
// do the work
<-sem // Free room in the channel
}()
for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
for _, m0 := range m {
go work(m0)
}
}
Run Code Online (Sandbox Code Playgroud)仅处理 10 条消息的限制是由堆栈中的其他地方引起的。可能您正在看到一场比赛,前 10 名填满了通道,然后工作尚未完成,或者您可能不小心从工人例行程序中返回。如果您的员工坚持按照您所描述的模型进行操作,那么您需要确保他们不会回来。
目前尚不清楚您是否希望该进程在处理完一定数量的消息后返回。如果您确实希望退出此进程,则需要等待所有工作人员完成当前任务,并可能通知他们随后返回。看看sync.WaitGroup如何同步它们的完成,并有另一个通道来表明没有更多的工作,或者关闭msgChannel,并在您的工作人员中处理它。(看一下 2 元组返回通道接收表达式。)
| 归档时间: |
|
| 查看次数: |
493 次 |
| 最近记录: |