观察MongoDB Change Streams

vah*_*det 3 event-handling go mongodb mgo mongo-go

我们希望Go应用程序监听集合上的数据更改.因此,谷歌搜索解决方案时,我们遇到了MongoDB的Change Streams.该链接还展示了一些语言的一些实现片段,如Python,Java,Nodejs等.但是,Go没有任何代码.

我们使用Mgo作为驱动程序,但无法找到有关更改流的明确声明.

有没有人知道如何使用Mgo或Go的任何其他Mongo驱动程序观看Change Streams?

icz*_*cza 16

由Gustavo Niemeyer开发的流行mgo驱动程序(github.com/go-mgo/mgo)已经变暗(未维护).它不支持变更流.

社区支持的fork github.com/globalsign/mgo更好,并且已经添加了对更改流的支持(请参阅此处的详细信息).

要观察集合的更改,只需使用Collection.Watch()返回值的方法mgo.ChangeStream.这是一个使用它的简单示例:

coll := ... // Obtain collection

pipeline := []bson.M{}

changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
    fmt.Printf("Change: %v\n", changeDoc)
}

if err := changeStream.Close(); err != nil {
    return err
}
Run Code Online (Sandbox Code Playgroud)

另请注意,正在开发一个官方的 MongoDB Go驱动程序,它在这里宣布:考虑引入官方MongoDB Go驱动程序的社区效应

它目前处于alpha(!!)阶段,因此请考虑这一点.它在这里可用:github.com/mongodb/mongo-go-driver.它也已经支持变更流,类似地通过该Collection.Watch()方法(这是一种不同的mongo.Collection类型,它与之无关mgo.Collection).它返回一个mongo.Cursor你可以这样使用的:

var coll mongo.Collection = ... // Obtain collection

ctx := context.Background()

var pipeline interface{} // set up pipeline

cur, err := coll.Watch(ctx, pipeline)
if err != nil {
    // Handle err
    return
}
defer cur.Close(ctx)

for cur.Next(ctx) {
    elem := bson.NewDocument()
    if err := cur.Decode(elem); err != nil {
        log.Fatal(err)
    }

    // do something with elem....
}

if err := cur.Err(); err != nil {
    log.Fatal(err)
}
Run Code Online (Sandbox Code Playgroud)

  • 很好的答案,值得注意的建议.谢谢! (2认同)

小智 5

此示例使用MongoDB 支持的 Go 驱动程序和流管道(仅过滤具有 field1=1 和 field2=false 的文档):

    ctx := context.TODO()
    clientOptions := options.Client().ApplyURI(mongoURI)
    client, err := mongo.Connect(ctx, clientOptions)
    if err != nil {
        log.Fatal(err)
    }
    err = client.Ping(ctx, nil)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Connected!")

    collection := client.Database("test").Collection("test")

    pipeline := mongo.Pipeline{bson.D{
        {"$match",
            bson.D{
                {"fullDocument.field1", 1},
                {"fullDocument.field2", false},
            },
        },
    }}
    streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)

    stream, err := collection.Watch(ctx, pipeline, streamOptions)
    if err != nil {
        log.Fatal(err)
    }
    log.Print("waiting for changes")
    var changeDoc map[string]interface{}
    for stream.Next(ctx) {
        if e := stream.Decode(&changeDoc); e != nil {
            log.Printf("error decoding: %s", e)
        }
        log.Printf("change: %+v", changeDoc)
    }
Run Code Online (Sandbox Code Playgroud)