我正在进行巡回演唱,我觉得除了并发之外,我对语言有很好的理解.
在幻灯片72上有一个练习,要求读者并行化一个网络爬虫(并使其不覆盖重复,但我还没有到那里.)
这是我到目前为止:
func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ch <- fmt.Sprintln(err)
return
}
ch <- fmt.Sprintf("found: %s %q\n", url, body)
for _, u := range urls {
go Crawl(u, depth-1, fetcher, ch)
}
}
func main() {
ch := make(chan string, 100)
go Crawl("http://golang.org/", 4, fetcher, ch)
for i := range ch {
fmt.Println(i)
}
}
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是close(ch)
拨打电话的地方.如果我defer close(ch)
在Crawl
方法中放置一个地方,那么我最终会在其中一个生成的goroutine中写入一个封闭的通道,因为该方法将在生成的goroutine之前完成执行.
如果我省略了调用close(ch)
,如我的示例代码中所示,程序在所有goroutine完成执行后死锁,但主线程仍然在for循环中等待通道,因为通道从未关闭.
fas*_*mat 17
看看Effective Go的Parallelization部分可以找到解决方案的想法.您必须在功能的每个返回路径上关闭通道.实际上这是defer语句的一个很好的用例:
func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
defer close(ret)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ret <- err.Error()
return
}
ret <- fmt.Sprintf("found: %s %q", url, body)
result := make([]chan string, len(urls))
for i, u := range urls {
result[i] = make(chan string)
go Crawl(u, depth-1, fetcher, result[i])
}
for i := range result {
for s := range result[i] {
ret <- s
}
}
return
}
func main() {
result := make(chan string)
go Crawl("http://golang.org/", 4, fetcher, result)
for s := range result {
fmt.Println(s)
}
}
Run Code Online (Sandbox Code Playgroud)
与代码的本质区别在于,每个Crawl实例都有自己的返回通道,调用函数会在返回通道中收集结果.
我和这个方向完全不同.关于使用地图的提示我可能会误导.
// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
v map[string]string
mux sync.Mutex
}
func (c *SafeUrlMap) Set(key string, body string) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key] = body
c.mux.Unlock()
}
// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
val, ok := c.v[key]
return val, ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
defer wg.Done()
urlMap.Set(url, body)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
for _, u := range urls {
if _, ok := urlMap.Value(u); !ok {
wg.Add(1)
go Crawl(u, depth-1, fetcher, urlMap)
}
}
return
}
var wg sync.WaitGroup
func main() {
urlMap := SafeUrlMap{v: make(map[string]string)}
wg.Add(1)
go Crawl("http://golang.org/", 4, fetcher, urlMap)
wg.Wait()
for url := range urlMap.v {
body, _ := urlMap.Value(url)
fmt.Printf("found: %s %q\n", url, body)
}
}
Run Code Online (Sandbox Code Playgroud)
小智 8
与已接受的答案类似的想法,但没有获取重复的 URL,并直接打印到控制台。defer() 也没有被使用。当 goroutine 完成时,我们使用通道来发出信号。SafeMap 的想法源自前面介绍的 SafeCounter。
对于子协程,我们创建一个通道数组,并通过等待通道来等待每个子协程返回。
package main
import (
"fmt"
"sync"
)
// SafeMap is safe to use concurrently.
type SafeMap struct {
v map[string] bool
mux sync.Mutex
}
// SetVal sets the value for the given key.
func (m *SafeMap) SetVal(key string, val bool) {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
m.v[key] = val
m.mux.Unlock()
}
// Value returns the current value of the counter for the given key.
func (m *SafeMap) GetVal(key string) bool {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer m.mux.Unlock()
return m.v[key]
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {
// Check if we fetched this url previously.
if ok := urlMap.GetVal(url); ok {
//fmt.Println("Already fetched url!")
status <- true
return
}
// Marking this url as fetched already.
urlMap.SetVal(url, true)
if depth <= 0 {
status <- false
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
status <- false
return
}
fmt.Printf("found: %s %q\n", url, body)
statuses := make ([]chan bool, len(urls))
for index, u := range urls {
statuses[index] = make (chan bool)
go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
}
// Wait for child goroutines.
for _, childstatus := range(statuses) {
<- childstatus
}
// And now this goroutine can finish.
status <- true
return
}
func main() {
urlMap := SafeMap{v: make(map[string] bool)}
status := make(chan bool)
go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
<- status
}
Run Code Online (Sandbox Code Playgroud)
在地图上O(1)时间查找url而不是在所访问的所有URL的切片上进行O(n)查找应该有助于最小化在临界区内花费的时间,这对于该示例来说是微不足道的时间量但是将与比例相关.
WaitGroup用于阻止顶级Crawl()函数返回,直到所有子进程例程都完成.
func Crawl(url string, depth int, fetcher Fetcher) {
var str_map = make(map[string]bool)
var mux sync.Mutex
var wg sync.WaitGroup
var crawler func(string,int)
crawler = func(url string, depth int) {
defer wg.Done()
if depth <= 0 {
return
}
mux.Lock()
if _, ok := str_map[url]; ok {
mux.Unlock()
return;
}else{
str_map[url] = true
mux.Unlock()
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q %q\n", url, body, urls)
for _, u := range urls {
wg.Add(1)
go crawler(u, depth-1)
}
}
wg.Add(1)
crawler(url,depth)
wg.Wait()
}
func main() {
Crawl("http://golang.org/", 4, fetcher)
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
7395 次 |
最近记录: |