我目前在Perl中处理了5个巨大的(每个400万行)日志文件,我想我可能会尝试在Go及其并发功能中实现相同的功能.因此,在Go中缺乏经验,我在考虑如下所示.任何关于该方法的评论将不胜感激.一些粗糙的伪代码:
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
func processRow (r Row) {
wg2.Add(1)
defer wg2.Done()
res = <process r>
return res
}
func processFile(f File) {
wg1.Add(1)
open(newfile File)
defer wg1.Done()
line = <row from f>
result = go processRow(line)
newFile.Println(result) // Write new processed line to newFile
wg2.Wait()
newFile.Close()
}
func main() {
for each f logfile {
go processFile(f)
}
wg1.Wait()
}
Run Code Online (Sandbox Code Playgroud)
所以,想法是我同时处理这5个文件,然后每个文件的所有行也将同时处理.
那会有用吗?
您绝对应该使用渠道来管理已处理的行.或者你也可以写另一个goroutine来处理你的输出.
var numGoWriters = 10
func processRow(r Row, ch chan<- string) {
res := process(r)
ch <- res
}
func writeRow(f File, ch <-chan string) {
w := bufio.NewWriter(f)
for s := range ch {
_, err := w.WriteString(s + "\n")
}
func processFile(f File) {
outFile, err := os.Create("/path/to/file.out")
if err != nil {
// handle it
}
defer outFile.Close()
var wg sync.WaitGroup
ch := make(chan string, 10) // play with this number for performance
defer close(ch) // once we're done processing rows, we close the channel
// so our worker threads exit
fScanner := bufio.NewScanner(f)
for fScanner.Scan() {
wg.Add(1)
go func() {
processRow(fScanner.Text(), ch)
wg.Done()
}()
}
for i := 0; i < numGoWriters; i++ {
go writeRow(outFile, ch)
}
wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)
在这里,我们processRow进行了所有处理(我假设string),writeRow执行所有输出I/O,processFile并将每个文件绑在一起.然后所有main必须做的就是交出文件,生成goroutines,等等.
func main() {
var wg sync.WaitGroup
filenames := [...]string{"here", "are", "some", "log", "paths"}
for fname := range filenames {
inFile, err := os.Open(fname)
if err != nil {
// handle it
}
defer inFile.Close()
wg.Add(1)
go processFile(inFile)
}
wg.Wait()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
944 次 |
| 最近记录: |