Sco*_*zer 10 concurrency multithreading go goroutine
我正在尝试用goroutines编写一个简单的工作池.
work_channel?码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := 'a'; c < 'z'; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s\n", x)
}
}
Run Code Online (Sandbox Code Playgroud)
icz*_*cza 32
在任何意义上,您的解决方案都不是工作者goroutine池:您的代码不会限制并发goroutine,并且它不会"重用"goroutine(它会在收到新作业时启动一个新的).
在Bruteforce MD5密码破解程序上发布,您可以使用生产者 - 消费者模式.你可以有一个指定的生产者 goroutine,它可以生成作业(要做的事情/计算),并在作业通道上发送它们.您可以拥有一个固定的消费者 goroutine 池(例如其中的5个),这些池将循环通过作业交付的通道,并且每个都将执行/完成接收的作业.
该制片人够程可以简单地关闭jobs时生成和发送的所有工作,妥善信令信道的消费者没有更多的就业机会将到来.for ... range通道上的构造处理"关闭"事件并正确终止.请注意,在关闭频道之前发送的所有作业仍将被传递.
这将导致干净的设计,将导致固定(但任意)数量的goroutine,并且它将始终使用100%CPU(如果goroutines的#大于CPU核心数).它还具有以下优点:可以通过正确选择信道容量(缓冲信道)和消费者 goroutine 的数量来"限制"它.
请注意,此模型具有指定的生产者goroutine不是强制性的.你也可以有多个goroutine来生成作业,但是你必须同步它们以便jobs在所有生产者goroutine完成生成作业时关闭通道 - 否则jobs当它已经被关闭时尝试在通道上发送另一个作业会导致运行时恐慌.通常生产工作是便宜的,并且可以比他们可以执行的更快的速度生产,所以这种模型在1 goroutine中生产它们而许多人正在消耗/执行它们在实践中是好的.
处理结果:
如果作业有结果,您可以选择指定结果通道,可以在其上发送结果("发回"),或者您可以选择在作业完成/完成时处理消费者的结果.后者甚至可以通过具有处理结果的"回调"函数来实现.重要的是,结果是可以独立处理还是需要合并(例如map-reduce框架)或聚合.
如果你使用results频道,你还需要一个从中接收值的goroutine,防止消费者被阻止(如果缓冲区results将被填充则会发生).
results渠道我不是将简单string值作为作业和结果发送,而是创建一个包装类型,它可以保存任何其他信息,因此它更灵活:
type Job struct {
Id int
Work string
Result string
}
Run Code Online (Sandbox Code Playgroud)
请注意,Jobstruct也会包装结果,因此当我们发回结果时,它还包含原始内容Job作为上下文 - 通常非常有用.还要注意,仅*Job在通道上发送指针()而不是Job值是有利的,因此不需要制作Jobs的"无数"副本,并且Job结构值的大小也变得无关紧要.
以下是这个生产者 - 消费者的样子:
我会使用2个sync.WaitGroup值,他们的角色将遵循:
var wg, wg2 sync.WaitGroup
Run Code Online (Sandbox Code Playgroud)
生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
Run Code Online (Sandbox Code Playgroud)
完成后(没有更多工作),jobs渠道关闭,向消费者发出信号,表明不再有工作岗位到达.
请注意,produce()将jobs通道视为仅发送,因为这是生产者只需要执行的操作:在其上发送作业(除了关闭它,但在仅发送通道上也允许).生产者中的意外接收将是编译时错误(在编译时提前检测到).
只要可以接收工作,消费者的责任就是接收工作,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,consume()将jobs频道视为仅接收 ; 消费者只需要从中接收.类似地,仅为消费者发送results频道.
另请注意,此处的results通道无法关闭,因为有多个消费者goroutine,只有第一次尝试关闭它会成功,而进一步会导致运行时恐慌!results所有消费者goroutine结束后,渠道可以(必须)关闭,因为我们可以确定在results渠道上不会再发送任何其他值(结果).
我们有结果需要分析:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,只要它们可能出现(直到results通道关闭),它也会收到结果.results分析仪的通道仅接收.
请注意通道类型的使用:只要它足够,在编译时只使用单向通道类型来及早发现并防止错误.如果您确实需要两个方向,请仅使用双向通道类型.
这就是所有这些粘在一起的方式:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
Run Code Online (Sandbox Code Playgroud)
示例输出:
这是一个示例输出:
正如您所看到的,在所有作业入队之前,结果即将到来并进行分析:
worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
Run Code Online (Sandbox Code Playgroud)
在Go Playground上试用完整的应用程序.
results频道如果我们不使用results频道但消费者goroutines立即处理结果(在我们的情况下打印它),代码会显着简化.在这种情况下,我们不需要2个sync.WaitGroup值(第二个只需要等待分析仪完成).
没有results渠道,完整的解决方案是这样的:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
Run Code Online (Sandbox Code Playgroud)
输出与results通道的"相似" (但当然执行/完成顺序是随机的).
在Go Playground上试试这个变种.