使用 Kafka-Go,为什么我看到的是批处理读/写?有没有我缺少的配置?

Cha*_*nce 6 go apache-kafka

我将从 RabbitMQ 切换到 Kafka。这只是一个简单的秒杀,看看 Kafka 是如何运作的。我不确定是否有我遗漏的设置,是否是我的代码,是否是 Kafka-Go,或者这是预期的 Kafka 行为。

我试过调整BatchSize和 ,BatchTimeout但都没有影响。

下面的代码创建了一个具有 6 个分区和 3 个复制因子的主题。然后它会产生一个递增的消息100ms。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。

在下面的日志中,它持续 7 秒没有收到消息,然后收到突发。我正在使用 Confluent 的平台,所以我意识到会有一些网络延迟,但没有达到我所看到的程度。

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "log"
    "net"
    "strconv"
    "time"

    kafka "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

func newDialer(clientID, username, password string) *kafka.Dialer {
    mechanism := plain.Mechanism{
        Username: username,
        Password: password,
    }

    rootCAs, _ := x509.SystemCertPool()
    if rootCAs == nil {
        rootCAs = x509.NewCertPool()
    }

    return &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        ClientID:      clientID,
        SASLMechanism: mechanism,
        TLS: &tls.Config{
            InsecureSkipVerify: false,
            RootCAs:            rootCAs,
        },
    }
}

func createTopic(url string, topic string, dialer *kafka.Dialer) {
    conn, err := dialer.Dial("tcp", url)
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()

    controller, err := conn.Controller()
    if err != nil {
        panic(err.Error())
    }

    var controllerConn *kafka.Conn
    controllerConn, err = dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        panic(err.Error())
    }
    defer controllerConn.Close()

    topicConfigs := []kafka.TopicConfig{
        {
            Topic:             topic,
            NumPartitions:     6,
            ReplicationFactor: 3,
        },
    }

    err = controllerConn.CreateTopics(topicConfigs...)
    if err != nil {
        panic(err.Error())
    }

}

func newWriter(url string, topic string, dialer *kafka.Dialer) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:      []string{url},
        Topic:        topic,
        Balancer:     &kafka.CRC32Balancer{},
        Dialer:       dialer,
        BatchSize:    10,
        BatchTimeout: 1 * time.Millisecond,
    })
}

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{url},
        Topic:     topic,
        Dialer:    dialer,
        Partition: partition,
    })
}

func read(url string, topic string, dialer *kafka.Dialer, partition int) {

    reader := newReader(url, topic, partition, dialer)
    defer reader.Close()
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            panic(err)
        }
        log.Printf("rec%d:\t%s\n", partition, msg.Value)
    }
}
func write(url string, topic string, dialer *kafka.Dialer) {
    writer := newWriter(url, topic, dialer)
    defer writer.Close()
    for i := 0; ; i++ {
        v := []byte("V" + strconv.Itoa(i))
        log.Printf("send:\t%s\n", v)
        msg := kafka.Message{ Key: v, Value: v }
        err := writer.WriteMessages(context.Background(), msg)
        if err != nil {
            fmt.Println(err)
        }
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    url := "_______.______.___.confluent.cloud:9092"
    topic := "test"
    username := "________________"
    password := "________________"
    clientID := "________________"
    dialer := newDialer(clientID, username, password)
    ctx := context.Background()
    createTopic(url, topic, dialer)
    for i := 0; i < 6; i++ {
        go read(url, topic, dialer, i)
    }

    go write(url, topic, dialer)
    <-ctx.Done()
}

Run Code Online (Sandbox Code Playgroud)

正在记录以下内容。

2020/11/02 23:19:22 send:       V0
2020/11/02 23:19:23 send:       V1
2020/11/02 23:19:23 send:       V2
2020/11/02 23:19:23 send:       V3
2020/11/02 23:19:24 send:       V4
2020/11/02 23:19:24 send:       V5
2020/11/02 23:19:24 send:       V6
2020/11/02 23:19:25 send:       V7
2020/11/02 23:19:25 send:       V8
2020/11/02 23:19:25 send:       V9
2020/11/02 23:19:25 send:       V10
2020/11/02 23:19:26 send:       V11
2020/11/02 23:19:26 send:       V12
2020/11/02 23:19:26 send:       V13
2020/11/02 23:19:26 send:       V14
2020/11/02 23:19:26 send:       V15
2020/11/02 23:19:27 send:       V16
2020/11/02 23:19:27 send:       V17
2020/11/02 23:19:27 send:       V18
2020/11/02 23:19:27 send:       V19
2020/11/02 23:19:28 send:       V20
2020/11/02 23:19:29 send:       V21
2020/11/02 23:19:29 send:       V22
2020/11/02 23:19:29 send:       V23
2020/11/02 23:19:29 send:       V24
2020/11/02 23:19:29 send:       V25
2020/11/02 23:19:30 send:       V26
2020/11/02 23:19:30 send:       V27
2020/11/02 23:19:30 send:       V28
2020/11/02 23:19:30 send:       V29
2020/11/02 23:19:31 send:       V30
2020/11/02 23:19:31 send:       V31
2020/11/02 23:19:31 send:       V32
2020/11/02 23:19:32 send:       V33
2020/11/02 23:19:32 send:       V34
2020/11/02 23:19:32 rec3:       V8
2020/11/02 23:19:32 rec3:       V14
2020/11/02 23:19:32 rec3:       V15
2020/11/02 23:19:32 rec3:       V16
2020/11/02 23:19:32 rec3:       V17
2020/11/02 23:19:32 rec3:       V20
2020/11/02 23:19:32 rec3:       V21
2020/11/02 23:19:32 rec3:       V23
2020/11/02 23:19:32 rec3:       V29
2020/11/02 23:19:32 rec1:       V0
2020/11/02 23:19:32 rec1:       V9
2020/11/02 23:19:32 rec1:       V22
2020/11/02 23:19:32 rec1:       V28
2020/11/02 23:19:32 rec4:       V4
2020/11/02 23:19:32 rec4:       V5
2020/11/02 23:19:32 rec4:       V7
2020/11/02 23:19:32 rec4:       V10
2020/11/02 23:19:32 rec4:       V11
2020/11/02 23:19:32 rec4:       V12
2020/11/02 23:19:32 rec4:       V18
2020/11/02 23:19:32 rec4:       V24
2020/11/02 23:19:32 rec4:       V25
2020/11/02 23:19:32 rec4:       V30
2020/11/02 23:19:32 rec4:       V31
2020/11/02 23:19:32 send:       V35
2020/11/02 23:19:32 rec5:       V1
2020/11/02 23:19:32 rec5:       V2
2020/11/02 23:19:32 rec5:       V3
2020/11/02 23:19:32 rec5:       V34
2020/11/02 23:19:32 rec2:       V6
2020/11/02 23:19:32 rec2:       V13
2020/11/02 23:19:32 rec2:       V26
2020/11/02 23:19:32 rec2:       V33
2020/11/02 23:19:32 send:       V36
2020/11/02 23:19:33 send:       V37
2020/11/02 23:19:33 send:       V38
2020/11/02 23:19:33 send:       V39
2020/11/02 23:19:33 send:       V40
2020/11/02 23:19:33 send:       V41
2020/11/02 23:19:33 rec0:       V19
2020/11/02 23:19:33 rec0:       V27
2020/11/02 23:19:33 rec0:       V32
2020/11/02 23:19:34 send:       V42
2020/11/02 23:19:34 send:       V43
2020/11/02 23:19:34 send:       V44
2020/11/02 23:19:34 send:       V45
2020/11/02 23:19:34 send:       V46
2020/11/02 23:19:35 send:       V47
2020/11/02 23:19:35 send:       V48
2020/11/02 23:19:35 send:       V49
2020/11/02 23:19:35 send:       V50
2020/11/02 23:19:35 send:       V51
2020/11/02 23:19:35 send:       V52
2020/11/02 23:19:36 send:       V53
2020/11/02 23:19:36 send:       V54
2020/11/02 23:19:36 send:       V55
2020/11/02 23:19:36 send:       V56
2020/11/02 23:19:36 send:       V57
2020/11/02 23:19:37 send:       V58
2020/11/02 23:19:37 send:       V59
2020/11/02 23:19:37 send:       V60
2020/11/02 23:19:38 send:       V61
2020/11/02 23:19:38 send:       V62
2020/11/02 23:19:38 send:       V63
2020/11/02 23:19:38 send:       V64
2020/11/02 23:19:38 send:       V65
2020/11/02 23:19:39 send:       V66
2020/11/02 23:19:39 send:       V67
2020/11/02 23:19:39 send:       V68
2020/11/02 23:19:40 send:       V69
2020/11/02 23:19:40 send:       V70
2020/11/02 23:19:40 send:       V71
2020/11/02 23:19:40 send:       V72
2020/11/02 23:19:40 send:       V73
2020/11/02 23:19:40 send:       V74
2020/11/02 23:19:41 send:       V75
2020/11/02 23:19:41 send:       V76
2020/11/02 23:19:41 rec1:       V41
2020/11/02 23:19:41 rec1:       V56
2020/11/02 23:19:41 rec1:       V68
2020/11/02 23:19:41 rec1:       V74
2020/11/02 23:19:41 rec1:       V75
2020/11/02 23:19:41 rec1:       V76
2020/11/02 23:19:41 rec3:       V37
2020/11/02 23:19:41 rec3:       V40
2020/11/02 23:19:41 rec3:       V42
2020/11/02 23:19:41 rec3:       V48
2020/11/02 23:19:41 rec3:       V55
2020/11/02 23:19:41 rec3:       V57
2020/11/02 23:19:41 rec3:       V60
2020/11/02 23:19:41 rec3:       V61
2020/11/02 23:19:41 rec3:       V62
2020/11/02 23:19:41 send:       V77
2020/11/02 23:19:41 rec4:       V38
2020/11/02 23:19:41 rec4:       V39
2020/11/02 23:19:41 rec4:       V45
2020/11/02 23:19:41 rec4:       V46
2020/11/02 23:19:41 rec4:       V47
2020/11/02 23:19:41 rec4:       V53
2020/11/02 23:19:41 rec4:       V59
2020/11/02 23:19:41 rec4:       V70
2020/11/02 23:19:41 rec4:       V71
2020/11/02 23:19:41 rec4:       V73
2020/11/02 23:19:41 rec5:       V35
2020/11/02 23:19:41 rec5:       V36
2020/11/02 23:19:41 rec5:       V43
2020/11/02 23:19:41 rec5:       V49
2020/11/02 23:19:41 rec5:       V54
2020/11/02 23:19:41 rec5:       V63
2020/11/02 23:19:41 rec5:       V69
2020/11/02 23:19:41 rec5:       V77
2020/11/02 23:19:41 send:       V78
2020/11/02 23:19:41 rec2:       V44
2020/11/02 23:19:41 rec2:       V50
2020/11/02 23:19:41 rec2:       V51
2020/11/02 23:19:41 rec2:       V64
2020/11/02 23:19:41 rec2:       V65
2020/11/02 23:19:41 rec2:       V66
2020/11/02 23:19:41 rec2:       V72
2020/11/02 23:19:41 send:       V79
2020/11/02 23:19:42 send:       V80
2020/11/02 23:19:42 send:       V81
2020/11/02 23:19:42 send:       V82
2020/11/02 23:19:42 send:       V83
2020/11/02 23:19:42 send:       V84
2020/11/02 23:19:43 send:       V85
2020/11/02 23:19:43 rec0:       V52
2020/11/02 23:19:43 rec0:       V58
2020/11/02 23:19:43 rec0:       V67
2020/11/02 23:19:43 send:       V86
Run Code Online (Sandbox Code Playgroud)

任何建议将不胜感激。谢谢!

编辑:

缓冲肯定只是在 Kafka-Go 中发生。Sarama 不会遇到相同的行为:

2020/11/02 23:19:22 send:       V0
2020/11/02 23:19:23 send:       V1
2020/11/02 23:19:23 send:       V2
2020/11/02 23:19:23 send:       V3
2020/11/02 23:19:24 send:       V4
2020/11/02 23:19:24 send:       V5
2020/11/02 23:19:24 send:       V6
2020/11/02 23:19:25 send:       V7
2020/11/02 23:19:25 send:       V8
2020/11/02 23:19:25 send:       V9
2020/11/02 23:19:25 send:       V10
2020/11/02 23:19:26 send:       V11
2020/11/02 23:19:26 send:       V12
2020/11/02 23:19:26 send:       V13
2020/11/02 23:19:26 send:       V14
2020/11/02 23:19:26 send:       V15
2020/11/02 23:19:27 send:       V16
2020/11/02 23:19:27 send:       V17
2020/11/02 23:19:27 send:       V18
2020/11/02 23:19:27 send:       V19
2020/11/02 23:19:28 send:       V20
2020/11/02 23:19:29 send:       V21
2020/11/02 23:19:29 send:       V22
2020/11/02 23:19:29 send:       V23
2020/11/02 23:19:29 send:       V24
2020/11/02 23:19:29 send:       V25
2020/11/02 23:19:30 send:       V26
2020/11/02 23:19:30 send:       V27
2020/11/02 23:19:30 send:       V28
2020/11/02 23:19:30 send:       V29
2020/11/02 23:19:31 send:       V30
2020/11/02 23:19:31 send:       V31
2020/11/02 23:19:31 send:       V32
2020/11/02 23:19:32 send:       V33
2020/11/02 23:19:32 send:       V34
2020/11/02 23:19:32 rec3:       V8
2020/11/02 23:19:32 rec3:       V14
2020/11/02 23:19:32 rec3:       V15
2020/11/02 23:19:32 rec3:       V16
2020/11/02 23:19:32 rec3:       V17
2020/11/02 23:19:32 rec3:       V20
2020/11/02 23:19:32 rec3:       V21
2020/11/02 23:19:32 rec3:       V23
2020/11/02 23:19:32 rec3:       V29
2020/11/02 23:19:32 rec1:       V0
2020/11/02 23:19:32 rec1:       V9
2020/11/02 23:19:32 rec1:       V22
2020/11/02 23:19:32 rec1:       V28
2020/11/02 23:19:32 rec4:       V4
2020/11/02 23:19:32 rec4:       V5
2020/11/02 23:19:32 rec4:       V7
2020/11/02 23:19:32 rec4:       V10
2020/11/02 23:19:32 rec4:       V11
2020/11/02 23:19:32 rec4:       V12
2020/11/02 23:19:32 rec4:       V18
2020/11/02 23:19:32 rec4:       V24
2020/11/02 23:19:32 rec4:       V25
2020/11/02 23:19:32 rec4:       V30
2020/11/02 23:19:32 rec4:       V31
2020/11/02 23:19:32 send:       V35
2020/11/02 23:19:32 rec5:       V1
2020/11/02 23:19:32 rec5:       V2
2020/11/02 23:19:32 rec5:       V3
2020/11/02 23:19:32 rec5:       V34
2020/11/02 23:19:32 rec2:       V6
2020/11/02 23:19:32 rec2:       V13
2020/11/02 23:19:32 rec2:       V26
2020/11/02 23:19:32 rec2:       V33
2020/11/02 23:19:32 send:       V36
2020/11/02 23:19:33 send:       V37
2020/11/02 23:19:33 send:       V38
2020/11/02 23:19:33 send:       V39
2020/11/02 23:19:33 send:       V40
2020/11/02 23:19:33 send:       V41
2020/11/02 23:19:33 rec0:       V19
2020/11/02 23:19:33 rec0:       V27
2020/11/02 23:19:33 rec0:       V32
2020/11/02 23:19:34 send:       V42
2020/11/02 23:19:34 send:       V43
2020/11/02 23:19:34 send:       V44
2020/11/02 23:19:34 send:       V45
2020/11/02 23:19:34 send:       V46
2020/11/02 23:19:35 send:       V47
2020/11/02 23:19:35 send:       V48
2020/11/02 23:19:35 send:       V49
2020/11/02 23:19:35 send:       V50
2020/11/02 23:19:35 send:       V51
2020/11/02 23:19:35 send:       V52
2020/11/02 23:19:36 send:       V53
2020/11/02 23:19:36 send:       V54
2020/11/02 23:19:36 send:       V55
2020/11/02 23:19:36 send:       V56
2020/11/02 23:19:36 send:       V57
2020/11/02 23:19:37 send:       V58
2020/11/02 23:19:37 send:       V59
2020/11/02 23:19:37 send:       V60
2020/11/02 23:19:38 send:       V61
2020/11/02 23:19:38 send:       V62
2020/11/02 23:19:38 send:       V63
2020/11/02 23:19:38 send:       V64
2020/11/02 23:19:38 send:       V65
2020/11/02 23:19:39 send:       V66
2020/11/02 23:19:39 send:       V67
2020/11/02 23:19:39 send:       V68
2020/11/02 23:19:40 send:       V69
2020/11/02 23:19:40 send:       V70
2020/11/02 23:19:40 send:       V71
2020/11/02 23:19:40 send:       V72
2020/11/02 23:19:40 send:       V73
2020/11/02 23:19:40 send:       V74
2020/11/02 23:19:41 send:       V75
2020/11/02 23:19:41 send:       V76
2020/11/02 23:19:41 rec1:       V41
2020/11/02 23:19:41 rec1:       V56
2020/11/02 23:19:41 rec1:       V68
2020/11/02 23:19:41 rec1:       V74
2020/11/02 23:19:41 rec1:       V75
2020/11/02 23:19:41 rec1:       V76
2020/11/02 23:19:41 rec3:       V37
2020/11/02 23:19:41 rec3:       V40
2020/11/02 23:19:41 rec3:       V42
2020/11/02 23:19:41 rec3:       V48
2020/11/02 23:19:41 rec3:       V55
2020/11/02 23:19:41 rec3:       V57
2020/11/02 23:19:41 rec3:       V60
2020/11/02 23:19:41 rec3:       V61
2020/11/02 23:19:41 rec3:       V62
2020/11/02 23:19:41 send:       V77
2020/11/02 23:19:41 rec4:       V38
2020/11/02 23:19:41 rec4:       V39
2020/11/02 23:19:41 rec4:       V45
2020/11/02 23:19:41 rec4:       V46
2020/11/02 23:19:41 rec4:       V47
2020/11/02 23:19:41 rec4:       V53
2020/11/02 23:19:41 rec4:       V59
2020/11/02 23:19:41 rec4:       V70
2020/11/02 23:19:41 rec4:       V71
2020/11/02 23:19:41 rec4:       V73
2020/11/02 23:19:41 rec5:       V35
2020/11/02 23:19:41 rec5:       V36
2020/11/02 23:19:41 rec5:       V43
2020/11/02 23:19:41 rec5:       V49
2020/11/02 23:19:41 rec5:       V54
2020/11/02 23:19:41 rec5:       V63
2020/11/02 23:19:41 rec5:       V69
2020/11/02 23:19:41 rec5:       V77
2020/11/02 23:19:41 send:       V78
2020/11/02 23:19:41 rec2:       V44
2020/11/02 23:19:41 rec2:       V50
2020/11/02 23:19:41 rec2:       V51
2020/11/02 23:19:41 rec2:       V64
2020/11/02 23:19:41 rec2:       V65
2020/11/02 23:19:41 rec2:       V66
2020/11/02 23:19:41 rec2:       V72
2020/11/02 23:19:41 send:       V79
2020/11/02 23:19:42 send:       V80
2020/11/02 23:19:42 send:       V81
2020/11/02 23:19:42 send:       V82
2020/11/02 23:19:42 send:       V83
2020/11/02 23:19:42 send:       V84
2020/11/02 23:19:43 send:       V85
2020/11/02 23:19:43 rec0:       V52
2020/11/02 23:19:43 rec0:       V58
2020/11/02 23:19:43 rec0:       V67
2020/11/02 23:19:43 send:       V86
Run Code Online (Sandbox Code Playgroud)

事实上,在某些情况下,它在确认发送之前实际接收,这让我想知道是否有内部消息发生,如果是,我是否应该关心......

recv:           V1176   p: 1    offset: 355
recv:           V1177   p: 2    offset: 363
send:           V1177   p: 2    offset: 363
send:           V1178   p: 5    offset: 377
recv:           V1178   p: 5    offset: 377
recv:           V1179   p: 1    offset: 356
send:           V1179   p: 1    offset: 356
send:           V1180   p: 1    offset: 357
recv:           V1180   p: 1    offset: 357
recv:           V1181   p: 1    offset: 358
send:           V1181   p: 1    offset: 358
send:           V1182   p: 4    offset: 393
recv:           V1182   p: 4    offset: 393
send:           V1183   p: 4    offset: 394
recv:           V1183   p: 4    offset: 394
send:           V1184   p: 3    offset: 358
recv:           V1184   p: 3    offset: 358
send:           V1185   p: 2    offset: 364
recv:           V1185   p: 2    offset: 364
send:           V1186   p: 3    offset: 359
recv:           V1186   p: 3    offset: 359
recv:           V1187   p: 3    offset: 360
send:           V1187   p: 3    offset: 360
send:           V1188   p: 5    offset: 378
recv:           V1188   p: 5    offset: 378
send:           V1189   p: 2    offset: 365
recv:           V1189   p: 2    offset: 365
recv:           V1190   p: 4    offset: 395
send:           V1190   p: 4    offset: 395
send:           V1191   p: 1    offset: 359
recv:           V1191   p: 1    offset: 359
send:           V1192   p: 4    offset: 396
recv:           V1192   p: 4    offset: 396
send:           V1193   p: 0    offset: 431
recv:           V1193   p: 0    offset: 431
send:           V1194   p: 4    offset: 397
recv:           V1194   p: 4    offset: 397
recv:           V1195   p: 2    offset: 366
send:           V1195   p: 2    offset: 366
send:           V1196   p: 3    offset: 361
recv:           V1196   p: 3    offset: 361
Run Code Online (Sandbox Code Playgroud)

pie*_*ipi 3

您需要更改ReaderConfig.MinBytes,否则segmentio/kafka-go将其设置为1e6 = 1 MB,在这种情况下,Kafka 将等待那么多数据积累后再响应请求。

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{url},
        Topic:     topic,
        Dialer:    dialer,
        Partition: partition,
        MinBytes:  1,         // same value of Shopify/sarama 
        MaxBytes:  57671680,
    })
}
Run Code Online (Sandbox Code Playgroud)

另一方面,默认值shopify/sarama是 1 个字节。

参考: