Cha*_*ant 1 concurrency channel go
我正在编写一个程序来处理文本文件中的数百万行,500k 需要 5 秒来验证文件,我想加快速度。
我想循环遍历这些项目并异步处理其中的 x,然后等待响应以查看是否应该继续。
我写了一些虚拟代码,我不确定我写的是否有意义,它看起来相当复杂,是否有一种更简单更优雅的方法来做到这一点。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// Need an object to loop over
// need a loop to read the response
items := 100000
concurrency := 20
sem := make(chan bool, concurrency)
returnChan := make(chan error)
finChan := make(chan bool)
var wg sync.WaitGroup
go func() {
for x := 0; x < items; x++ {
// loop over all items
// only do maxitems at a time
wg.Add(1)
sem <- true
go delayFunc(x, sem, returnChan, &wg)
}
wg.Wait()
finChan <- true
}()
var err error
finished := false
for {
select {
case err = <-returnChan:
if err != nil {
break
}
case _ = <-finChan:
finished = true
break
default:
continue
}
if err != nil || finished == true {
break
}
}
fmt.Println(err)
}
func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) {
//fmt.Printf("PROCESSING (%v)\n", x)
time.Sleep(10 * time.Millisecond)
<-sem // release the lock
wg.Done()
if x == 95000 {
returnChan <- fmt.Errorf("Something not right")
} else {
returnChan <- nil
}
}
Run Code Online (Sandbox Code Playgroud)
你的代码看起来不错,你实现了 Go 模式中常用的。缺点是 - 你为每个项目生成工作 goroutine。生成 goroutine 虽然便宜但并不是免费的。另一种方法是生成 N 个工作人员并通过通道向他们提供物品。像这样的东西
package main
import (
"fmt"
"time"
)
func main() {
items := 100
concurrency := 10
in := make(chan int)
ret := make(chan error)
for x := 0; x < concurrency; x++ {
go worker(in, ret)
}
go func() {
for x := 0; x < items; x++ {
// loop over all items
in <- x
}
close(in)
}()
for err := range ret {
if err != nil {
fmt.Println(err.Error())
break
}
}
}
func worker(in chan int, returnChan chan error) {
//fmt.Printf("PROCESSING (%v)\n", x)
for x := range in {
if x == 95 {
returnChan <- fmt.Errorf("Something not right")
} else {
returnChan <- nil
}
time.Sleep(10 * time.Millisecond)
}
returnChan <- fmt.Errorf("The End")
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8773 次 |
| 最近记录: |