我编写了一个简单的小例子,将1000万条记录插入mongodb.我开始按顺序工作.然后我查找了如何进行并发,并找到了goroutines.这看起来像我想要的,但它并不像我期望的那样表现.我实现了一个WaitGroup来阻止程序在所有goroutine完成之前退出,但我仍然遇到问题.
所以我将从正在发生的事情开始然后显示代码.当我运行没有goroutine的代码时,所有1000万条记录都插入mongodb罚款.然而,当我添加goroutine时,一些不确定的数量被输入..一般在8500左右给予或采取几百.我检查了mongodb日志,看它是否有问题,没有任何显示.所以我不确定是什么,可能是,只是没有被记录.无论如何,这是代码:
(旁注:我一次只做1条记录,但我把它拆分成一种方法,所以我可以在将来一次测试多条记录......只是还没弄明白如何用mongodb做到这一点然而.)
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"time"
"sync"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
// methods
func main() {
// Setup timer
startTime := time.Now()
// Setup collection
collection := getCollection("test", "readings")
fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(collection, readings)
// fmt.Print(".")
if i % 1000 == 0 {
fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func getCollection(databaseName string, tableName string) *mgo.Collection {
session, err := mgo.Dial("localhost")
if err != nil {
// panic(err)
fmt.Println("error getCollection:", err)
}
// defer session.Close()
// Optional. Switch the session to a monotonic behavior.
// session.SetMode(mgo.Monotonic, true)
collection := session.DB(databaseName).C(tableName)
return collection
}
func insertReadings(collection *mgo.Collection, readings []Reading) {
err := collection.Insert(readings)
if err != nil {
// panic(err)
fmt.Println("error insertReadings:", err)
}
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}
Run Code Online (Sandbox Code Playgroud)
一个完整的程序是通过将一个名为
mainpackage 的单个无转换包与它所导入的所有包传递链接而创建的.该main包中必须包名称main和声明函数main,它没有参数和返回值.Run Code Online (Sandbox Code Playgroud)func main() { … }程序执行从初始化
main包然后调用该函数开始main.函数main返回时,程序退出.它不等待其他(非main)goroutines完成.
您没有向我们提供有关您问题的简单,简洁,可编译和可执行的示例.这是您的代码的精简版本.
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
func main() {
// Setup timer
startTime := time.Now()
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(readings)
// fmt.Print(".")
if i%100000 == 0 {
fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func insertReadings(readings []Reading) {
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}
Run Code Online (Sandbox Code Playgroud)
输出:
readings prepared: 0.00
100000 readings queued for insert: 0.49
100000 readings queued for insert: 1.12
100000 readings queued for insert: 1.62
100000 readings queued for insert: 2.54
100000 readings queued for insert: 3.05
100000 readings queued for insert: 3.56
100000 readings queued for insert: 4.06
100000 readings queued for insert: 5.57
100000 readings queued for insert: 7.15
100000 readings queued for insert: 8.78
all readings inserted: 34.76
Run Code Online (Sandbox Code Playgroud)
现在,逐个构建程序,并查看它开始失败的位置.
| 归档时间: |
|
| 查看次数: |
1123 次 |
| 最近记录: |