一切顺利都是睡着了

b0x*_*d1n 0 concurrency multithreading go

我正在学习如何进行并发,并且我已将其编写为自己的应用程序,以便我可以在它工作后将其移植到不同的项目中.

我正在添加它的项目基本上将RowInfo发送到全局QueueChannel,然后我的工作人员应该接受这项工作并处理它.如果我将具有相同ID的两行排队,并且其中一行当前正由工作人员处理,我将从队列中删除重复行(您可以在调度程序中看到我在"继续"的位置).

这个排队/工作人员代码将在阻止ListenAndServe的Web服务器上运行,因此我希望它始终保持运行,并且工作人员始终保持积极寻找工作.我不想关闭频道(除非我按c + +给应用程序或其他东西).我怀疑我得到的错误与不关闭通道有关,因为这是很多其他线程提到这个错误似乎表明,但我不确定它与我所拥有的代码有什么关系.

终端错误输出:

[~/go/src/github.com/zzz/asynch]> go run main.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /home/zzz/go/src/github.com/zzz/asynch/main.go:29 +0x14b

goroutine 5 [select]:
main.diszzzcher(0xc82001a120, 0xc82001a180, 0xc82001a1e0)
    /home/zzz/go/src/github.com/zzz/asynch/main.go:42 +0x21a
created by main.main
    /home/zzz/go/src/github.com/zzz/asynch/main.go:19 +0xb1

goroutine 6 [chan receive]:
main.worker(0xc82001a180, 0xc82001a1e0)
    /home/zzz/go/src/github.com/zzz/asynch/main.go:55 +0x54
created by main.main
    /home/zzz/go/src/github.com/zzz/asynch/main.go:24 +0xf7

goroutine 7 [chan receive]:
main.worker(0xc82001a180, 0xc82001a1e0)
    /home/zzz/go/src/github.com/zzz/asynch/main.go:55 +0x54
created by main.main
    /home/zzz/go/src/github.com/zzz/asynch/main.go:24 +0xf7

goroutine 8 [chan receive]:
main.worker(0xc82001a180, 0xc82001a1e0)
    /home/zzz/go/src/github.com/zzz/asynch/main.go:55 +0x54
created by main.main
    /home/zzz/go/src/github.com/zzz/asynch/main.go:24 +0xf7

goroutine 9 [chan receive]:
main.worker(0xc82001a180, 0xc82001a1e0)
    /home/zzz/go/src/github.com/zzz/asynch/main.go:55 +0x54
created by main.main
    /home/zzz/go/src/github.com/zzz/asynch/main.go:24 +0xf7
exit status 2
Run Code Online (Sandbox Code Playgroud)

码:

package main

import (
    "log"
    "time"
)

type RowInfo struct {
    id int64
}

var QueueChan chan RowInfo

func main() {
    QueueChan := make(chan RowInfo)
    workerChan := make(chan RowInfo)
    exitChan := make(chan int64)

    go dispatcher(QueueChan, workerChan, exitChan)

    // Start WorkerCount number of workers
    workerCount := 4
    for i := 0; i < workerCount; i++ {
        go worker(workerChan, exitChan)
    }

    // Send test data
    for i := 0; i < 12; i++ {
        QueueChan <- RowInfo{id: int64(i)}
    }

    // Prevent app close
    for {
        time.Sleep(1 * time.Second)
    }
}

func dispatcher(queueChan, workerChan chan RowInfo, exitChan chan int64) {
    state := make(map[int64]bool)

    for {
        select {
        case job := <-QueueChan:
            if state[job.id] == true {
                continue
            }
            workerChan <- job
        case result := <-exitChan:
            state[result] = false
        }
    }
}

func worker(workerChan chan RowInfo, exitChan chan int64) {
    for job := range workerChan {
        log.Printf("Doing work on job rowInfo ID: %d", job.id)

        // Finish job
        exitChan <- job.id
    }
}
Run Code Online (Sandbox Code Playgroud)

谢谢.

mrd*_*l4r 5

错误告诉你:所有goroutine都睡着了,程序已经死锁.

现在为什么你的所有goroutines都睡着了?让我们一个一个检查:

  • worker够程:无限期地等待新的工作workerChan,将不会退出,直到workerChan被关闭,处于睡眠状态时,它会等待新的工作
  • dispatcher够程:一直循环选择在两个频道.永远不会退出,在等待时睡着了select
  • main够程:永远循环上time.Sleep,永远不会退出,是睡着了大部分的时间

通常情况下,在这种情况下,你会引入一个chan struct{}(称之为closing或类似的东西)并将其包含在你的selects中.如果你想关闭程序,只需close(closing).该select会选择<-closing选项,返回够程.您还应该添加一个sync.WaitGroup在所有goroutine退出时收到通知.