imm*_*eel 2 concurrency channel go goroutine
我正在尝试了解通道和 goroutines 并尝试编写一个 goroutine 来向服务器发出并发 API 请求
但是当我使用 goroutine 运行代码时,它似乎花费了与没有 goroutine 相同的时间。
func sendUser(user string, ch chan<- string) {
resp,err := http.get("URL"/user)
//do the processing and get resp=string
ch <- resp
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
for _ , user = range users {
go sendUser(user, ch)
for {
select {
case r := <-ch:
if r.err != nil {
fmt.Println(r.err)
}
responses = append(responses, r)
**//Is there a better way to show that the processing of response is complete**?
if len(responses) == len(users) {
return responses, nil
}
case <-time.After(50 * time.Millisecond):
fmt.Printf(".")
}
}
}
return responses, nil
}
Run Code Online (Sandbox Code Playgroud)
问题:
即使我使用了 goroutine,请求完成时间和没有 goroutine 时一样吗?我在使用 goroutines 时做错了什么吗?
为了告诉工作不要再在这里等待,我正在使用:
if len(responses) == len(users)
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来表明响应的处理已经完成并告诉 ch 不要再等待了?
什么是wait.Syncgroup?如何在我的 goroutine 中使用它?
我可能会做这样的事情..
func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("URL/" + user)
if err != nil {
log.Println("err handle it")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("err handle it")
}
ch <- string(b)
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
var wg sync.WaitGroup
for _, user = range users {
wg.Add(1)
go sendUser(user, ch, &wg)
}
// close the channel in the background
go func() {
wg.Wait()
close(ch)
}()
// read from channel as they come in until its closed
for res := range ch {
responses = append(responses, res)
}
return responses, nil
}
Run Code Online (Sandbox Code Playgroud)
它允许在发送时从通道读取。通过使用等待组,我将知道何时关闭通道。通过将等待组和关闭放在 goroutine 中,我可以“实时”从通道中读取数据而不会阻塞。
对于有界并行/速率限制,我们可以看一下 https://blog.golang.org/pipelines#TOC_9 中的示例。
基本上步骤是:
N工作协程,每个协程消耗相同(共享)的输入通道。从输入通道获取参数,调用 API,将结果发送到结果通道。sync.WaitGroup用于等待所有工作协程完成(在输入通道耗尽后)。
下面是它的代码示例(您可以立即运行它,尝试更改NUM_PARALLEL为不同数量的并行度)。更改BASE_URL为您的基本网址。
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
)
// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"
// number of parallelism
const NUM_PARALLEL = 20
// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
inputCh := make(chan string)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
// in case done is closed prematurely (because error midway),
// finish the loop (closing input channel)
break
}
}
}()
return inputCh
}
// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
url := BASE_URL + user
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
bodyStr := string(body)
return bodyStr, nil
}
// Wrapper for sendUser return value, used as result channel type
type result struct {
bodyStr string
err error
}
func AsyncHTTP(users []string) ([]string, error) {
done := make(chan struct{})
defer close(done)
inputCh := streamInputs(done, users)
var wg sync.WaitGroup
// bulk add goroutine counter at the start
wg.Add(NUM_PARALLEL)
resultCh := make(chan result)
for i := 0; i < NUM_PARALLEL; i++ {
// spawn N worker goroutines, each is consuming a shared input channel.
go func() {
for input := range inputCh {
bodyStr, err := sendUser(input)
resultCh <- result{bodyStr, err}
}
wg.Done()
}()
}
// Wait all worker goroutines to finish. Happens if there's no error (no early return)
go func() {
wg.Wait()
close(resultCh)
}()
results := []string{}
for result := range resultCh {
if result.err != nil {
// return early. done channel is closed, thus input channel is also closed.
// all worker goroutines stop working (because input channel is closed)
return nil, result.err
}
results = append(results, result.bodyStr)
}
return results, nil
}
func main() {
// populate users param
users := []string{}
for i := 1; i <= 100; i++ {
users = append(users, strconv.Itoa(i))
}
start := time.Now()
results, err := AsyncHTTP(users)
if err != nil {
fmt.Println(err)
return
}
for _, result := range results {
fmt.Println(result)
}
fmt.Println("finished in ", time.Since(start))
}
Run Code Online (Sandbox Code Playgroud)