Nats Jetstream 一次性交付

bul*_*ark 6 message-queue go nats.io

我想使用 Nats Jetstream 实现一次精确交付系统。文档称 Jetstream 有此选项,但没有有关其工作原理以及客户如何实现此选项的示例或详细信息。我知道在发布者端我们可以在创建Stream时设置MsgId并指定复制窗口,但是消费者端呢?

Byr*_*uth 5

Here are the docs for exactly-once delivery. This is a bit of a misnomer since what is actually needed (and what this feature provides) is exactly-once processing.

As you point out, it is a combination of de-duplication by the server when receiving a published message as well as a double ack call by the subscription that had received the message (plus retries if necessary).

Here is an example (excess error handling elided for brevity). Start the server with JetStream enabled: nats-server --js and then run this code (it assuming nats.go v1.16+).

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}
Run Code Online (Sandbox Code Playgroud)

It is worth noting that if an AckSync does fail (an error can be returned from it) then its on this code to retry the ack again until a response is received. A redundant ack from the client is a no-op.