我正试图绕过goroutines.我创建了一个简单的程序,在多个搜索引擎中并行执行相同的搜索.目前为了跟踪回复的数量,我计算了我收到的数字.虽然看起来有点业余.
有没有更好的方法知道我何时收到以下代码中所有goroutine的回复?
package main
import (
"fmt"
"net/http"
"log"
)
type Query struct {
url string
status string
}
func search (url string, out chan Query) {
fmt.Printf("Fetching URL %s\n", url)
resp, err := http.Get(url)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
out <- Query{url, resp.Status}
}
func main() {
searchTerm := "carrot"
fmt.Println("Hello world! Searching for ", searchTerm)
searchEngines := []string{
"http://www.bing.co.uk/?q=",
"http://www.google.co.uk/?q=",
"http://www.yahoo.co.uk/?q="}
out := make(chan Query)
for i := 0; i < len(searchEngines); i++ { …Run Code Online (Sandbox Code Playgroud) 我正在尝试运行一些goroutines,它们会将结果发送到一个频道.在完成所有goroutine之后,我需要一个让通道关闭的好方法.
我的第一次尝试是在产生所有go例程后关闭它,但我认为在所有goroutine可以发送结果之前,某个频道已关闭.
for i:=0; i<=10;i++{
go func(){
result:=calculate()
c<-result
}()
}
close(c)
for result:= range c{
all_result=append(all_result, result...)
}
Run Code Online (Sandbox Code Playgroud)
然后,我第二次尝试计算一个线程并在没有线程运行后关闭它.
for i:=0; i<=10;i++{
go func(){
atomic.AddUint64(&go_routine_count, 1)
result:=calculate()
c<-result
atomic.AddUint64(&rt_count, ^uint64(0))
}()
}
go func(){
for{
// some little time to let above goroutine count up go_routine_count before this goroutine can actually check go_routine_count==0
time.Sleep(time.Millisecond)
go_current_routine_count:=atomic.LoadUint64(&go_routine_count)
if go_routine_count==0{
close(c)
}
}
}()
for result:= range c{
all_result=append(all_result, result...)
}
Run Code Online (Sandbox Code Playgroud)
它有效,但我觉得可能有更正确或更有效的方式.此外,在某些情况下,如果后来的计数检查goroutine在循环中的goroutines之前运行,则此方法将不起作用.
有没有更好的办法?
我似乎并不完全理解Go中的地图.
我有这个代码:
fetch := map[string]int{some data}
for condition {
fetchlocal := map[string]int{}
for key, value := range fetch {
if condition {
fetchlocal[key] = value
}
}
go threadfunc (fetchlocal)
}
Run Code Online (Sandbox Code Playgroud)
现在,无论threadfunc函数使用fetchlocal变量,Go(go -race)都会发出警告:数据竞争.我也有一些恐慌.但为什么?fetchlocal变量不被任何其他goroutine使用.
有人可以开导我吗?
我们有一个流程,用户可以在该流程中请求从源中获取文件。该来源不是最可靠的,因此我们使用Amazon SQS实现了队列。我们将下载URL放入队列,然后使用在Go中编写的一个小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的S3。完成所有这些操作后,它将回调服务,该服务将通过电子邮件向用户发送电子邮件,告知他们文件已准备就绪。
最初,我编写此代码是为了创建n个通道,然后为每个通道附加1个例行程序,并使该例行程序处于无限循环中。这样,我可以确保一次只能处理固定数量的下载。
我意识到这不是应该使用通道的方式,如果我现在正确理解的话,实际上应该有一个通道在该通道上接收n个例程。每个go例程都处于无限循环中,等待一条消息,当它收到消息时,它将处理数据,执行应有的所有操作,完成后将等待下一条消息。这样可以确保我一次只处理n个文件。我认为这是正确的方法。我相信这是扇出,对不对?
我并不需要做的,是要合并这些进程重新走到一起。下载完成后,它将回调远程服务,以便处理剩余的过程。该应用程序无需执行其他任何操作。
好的,所以一些代码:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan …Run Code Online (Sandbox Code Playgroud) 我是Go的新手,试图找出同时从中提取信息的最佳方式REST API.目的是对a进行多个并发调用API,每次调用返回不同类型的数据.
我目前有:
s := NewClient()
c1 := make(chan map[string]Service)
c2 := make(chan map[string]ServicePlan)
c3 := make(chan map[string]ServiceInstance)
c4 := make(chan map[string]ServiceBinding)
c5 := make(chan map[string]Organization)
c6 := make(chan map[string]Space)
go func() {
c1 <- GetServices(s)
}()
go func() {
c2 <- GetServicePlans(s)
}()
go func() {
c3 <- GetServiceInstances(s)
}()
go func() {
c4 <- GetServiceBindings(s)
}()
go func() {
c5 <- GetOrganizations(s)
}()
go func() {
c6 <- GetSpaces(s)
}()
services := <- c1 …Run Code Online (Sandbox Code Playgroud) package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
"sync"
)
func main() {
db, _ := sql.Open("postgres", fmt.Sprintf("host=%s dbname=%s user=%s sslmode=disable", "localhost", "dbname", "postgres"))
defer db.Close()
db.SetMaxOpenConns(15)
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//#1
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
for rows.Next() {
//#2
db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
}
wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)
查询1打开15个连接,rows.Next()执行时将关闭它们。但是rows.Next()永远不会执行,因为它包含db.Exec()等待自由连接的内容。
如何解决这个问题呢?
对于以下代码:
func main() {
goRtns := runtime.NumGoroutine()
fmt.Println("goroutines:", goRtns)
}
Run Code Online (Sandbox Code Playgroud)
输出为1。但这是在“过程”中,没有goroutines被明确调用:
“在计算中,进程是正在执行的计算机程序的实例。它包含程序代码及其当前活动。根据操作系统(OS),进程可能由多个执行线程组成,这些线程执行指令同时进行。”
同样来自克里希纳·桑达拉姆(Krishna Sundarram)出色的“ goroutines如何工作”博客文章:http : //blog.nindalf.com/how-goroutines-work/
“创建goroutine不需要太多的内存,仅需要2kB的堆栈空间。它们可以通过根据需要分配和释放堆存储来增长。”
那么我的问题是:运行时库将正在运行的代码实例(我简单的main.go函数)计为goroutine。我是否假定父进程被视为go例程,并且具有相同的内存分配,垃圾回收等规则?假设读取有关goroutine执行的事实与运行它的总体go进程类似,是否明智?关于上面关于goroutine的第二个引号,这听起来像是一个程序在执行程序时增大/缩小其堆栈空间的过程,这是编程的标准范例。
go流程和例程是否共享相同的规则?还是我只是缺少有关报告的goroutine数量的信息。
使用Go,我有大型日志文件.目前我打开它们,创建一个新的扫描仪bufio.NewScanner,然后for scanner.Scan()循环遍历这些线.每行都通过处理函数发送,该函数将其与正则表达式匹配并提取数据.我想使用goroutines同时处理这个文件.我相信这可能比顺序遍历整个文件更快.
每个文件可能需要几秒钟,我想知道我是否可以一次处理10个文件中的单个文件.我相信如果需要我可以牺牲记忆力.我有〜3GB,最大的日志文件可能是75mb.
我看到scanner有一个.Split()方法,你可以提供自定义拆分功能,但我无法使用这种方法找到一个好的解决方案.
我还尝试创建一片切片,在扫描器中循环scanner.Scan()并附scanner.Text()加到每个切片.例如:
// pseudocode because I couldn't get this to work either
scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]
i := 0
for scanner.Scan() {
i = i + 1
if i > 5 {
i = 0
}
threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)
Run Code Online (Sandbox Code Playgroud)
我是Go的新手,关注效率和性能.我想学习如何编写好的Go代码!任何帮助或建议真的很感激.
我有一个要抓取的网址列表。我要做的是将所有成功抓取的页面数据存储到一个通道中,完成后将其转储到一个切片中。我不知道我将成功获取多少次,因此我无法指定固定长度。我希望代码能够到达wg.Wait(),然后等待直到所有wg.Done()方法都被调用,但是我从未到达该close(queue)语句。寻找类似的答案,我遇到了这个答案
作者做了类似的事情:
ports := make(chan string)
toScan := make(chan int)
var wg sync.WaitGroup
// make 100 workers for dialing
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for p := range toScan {
ports <- worker(*host, p)
}
}()
}
// close our receiving ports channel once all workers are done
go func() {
wg.Wait()
close(ports)
}()
Run Code Online (Sandbox Code Playgroud)
我将自己包裹wg.Wait()在goroutine中后,close(queue)就可以到达:
urls := getListOfURLS()
activities …Run Code Online (Sandbox Code Playgroud) 我有要处理的网址列表,但我想一次运行最大数量的goroutine。例如,如果我有30个网址,那么我只希望10个goroutine并行工作。
我对此的尝试如下:
parallel := flag.Int("parallel", 10, "max parallel requests allowed")
flag.Parse()
urls := flag.Args()
var wg sync.WaitGroup
client := rest.Client{}
results := make(chan string, *parallel)
for _, url := range urls {
wg.Add(1)
go worker(url, client, results, &wg)
}
for res := range results {
fmt.Println(res)
}
wg.Wait()
close(results)
Run Code Online (Sandbox Code Playgroud)
我的理解是,如果我创建一个大小为并行的缓冲通道,那么该代码将阻塞,直到我读出结果通道为止,这将取消阻塞我的代码并允许生成另一个goroutine。但是,此代码似乎在处理完所有网址后不会阻塞。有人可以向我解释如何使用通道限制运行的goroutine的数量吗?
go ×10
goroutine ×10
channel ×2
asynchronous ×1
concurrency ×1
memory ×1
postgresql ×1
process ×1