Go中的并行压缩压缩

Aks*_*wat 7 zip go

我正在尝试zip从大量中小型文件构建存档.我希望能够同时执行此操作,因为压缩是CPU密集型的,而且我正在多核服务器上运行.此外,我不希望将整个存档存储在内存中,因为它可能会变得很大.

我的问题是,我是否必须压缩每个文件然后手动组合所有内容与zip标题,校验和等?

任何帮助将不胜感激.

Att*_* O. 6

我不认为你可以结合使用zip标头.

你可以做的是,zip.Writer在一个单独的goroutine中按顺序运行,然后为你想要读取的每个文件生成一个新的goroutine,并将它们传递给正在压缩它们的goroutine.

这应该可以减少通过顺序读取文件所获得的IO开销,尽管它可能不会利用多个内核进行归档.

这是一个有效的例子.请注意,为了简单起见,

  • 它不能很好地处理错误,如果出现问题就会发生恐慌,
  • 并且它不会defer过多地使用该语句来证明事情应该发生的顺序.

由于defer 是LIFO,当你将很多它们堆叠在一起时,它有时会令人困惑.

package main

import (
    "archive/zip"
    "io"
    "os"
    "sync"
)

func ZipWriter(files chan *os.File) *sync.WaitGroup {
    f, err := os.Create("out.zip")
    if err != nil {
        panic(err)
    }
    var wg sync.WaitGroup
    wg.Add(1)
    zw := zip.NewWriter(f)
    go func() {
        // Note the order (LIFO):
        defer wg.Done() // 2. signal that we're done
        defer f.Close() // 1. close the file
        var err error
        var fw io.Writer
        for f := range files {
            // Loop until channel is closed.
            if fw, err = zw.Create(f.Name()); err != nil {
                panic(err)
            }
            io.Copy(fw, f)
            if err = f.Close(); err != nil {
                panic(err)
            }
        }
        // The zip writer must be closed *before* f.Close() is called!
        if err = zw.Close(); err != nil {
            panic(err)
        }
    }()
    return &wg
}

func main() {
    files := make(chan *os.File)
    wait := ZipWriter(files)

    // Send all files to the zip writer.
    var wg sync.WaitGroup
    wg.Add(len(os.Args)-1)
    for i, name := range os.Args {
        if i == 0 {
            continue
        }
        // Read each file in parallel:
        go func(name string) {
            defer wg.Done()
            f, err := os.Open(name)
            if err != nil {
                panic(err)
            }
            files <- f
        }(name)
    }

    wg.Wait()
    // Once we're done sending the files, we can close the channel.
    close(files)
    // This will cause ZipWriter to break out of the loop, close the file,
    // and unblock the next mutex:
    wait.Wait()
}
Run Code Online (Sandbox Code Playgroud)

用法:go run example.go /path/to/*.log.

这是事情应该发生的顺序:

  1. 打开输出文件进行写入.
  2. zip.Writer使用该文件创建一个.
  3. 启动goroutine监听频道上的文件.
  4. 浏览每个文件,这可以在每个文件的一个goroutine中完成.
  5. 将每个文件发送到步骤3中创建的goroutine.
  6. 处理完所述goroutine中的每个文件后,关闭该文件以释放资源.
  7. 将每个文件发送到所述goroutine后,关闭该频道.
  8. 等到压缩完成(按顺序完成).
  9. 一旦完成压缩(通道耗尽),就应关闭zip writer.
  10. 仅当zip writer关闭时,才应关闭输出文件.
  11. 最后一切都关闭了,所以关闭sync.WaitGroup告诉调用函数我们很高兴.(这里也可以使用一个频道,但sync.WaitGroup看起来更优雅.)
  12. 当您从zip编写器获得所有内容都已正确关闭的信号时,您可以main很好地退出并终止.

这可能无法解答您的问题,但我一直在使用类似的代码为Web服务即时生成zip存档.虽然实际的拉链是在一个goroutine中进行的,但它表现得相当不错.克服IO瓶颈已经是一种改进.