我将从 RabbitMQ 切换到 Kafka。这只是一个简单的秒杀,看看 Kafka 是如何运作的。我不确定是否有我遗漏的设置,是否是我的代码,是否是 Kafka-Go,或者这是预期的 Kafka 行为。
我试过调整BatchSize
和 ,BatchTimeout
但都没有影响。
下面的代码创建了一个具有 6 个分区和 3 个复制因子的主题。然后它会产生一个递增的消息100ms
。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。
在下面的日志中,它持续 7 秒没有收到消息,然后收到突发。我正在使用 Confluent 的平台,所以我意识到会有一些网络延迟,但没有达到我所看到的程度。
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"strconv"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func newDialer(clientID, username, password string) *kafka.Dialer {
mechanism := plain.Mechanism{
Username: username,
Password: password,
}
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
return &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
ClientID: clientID,
SASLMechanism: mechanism,
TLS: &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
},
}
}
func createTopic(url string, topic string, dialer *kafka.Dialer) {
conn, err := dialer.Dial("tcp", url)
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 6,
ReplicationFactor: 3,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
func newWriter(url string, topic string, dialer *kafka.Dialer) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{url},
Topic: topic,
Balancer: &kafka.CRC32Balancer{},
Dialer: dialer,
BatchSize: 10,
BatchTimeout: 1 * time.Millisecond,
})
}
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
})
}
func read(url string, topic string, dialer *kafka.Dialer, partition int) {
reader := newReader(url, topic, partition, dialer)
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
log.Printf("rec%d:\t%s\n", partition, msg.Value)
}
}
func write(url string, topic string, dialer *kafka.Dialer) {
writer := newWriter(url, topic, dialer)
defer writer.Close()
for i := 0; ; i++ {
v := []byte("V" + strconv.Itoa(i))
log.Printf("send:\t%s\n", v)
msg := kafka.Message{ Key: v, Value: v }
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
time.Sleep(100 * time.Millisecond)
}
}
func main() {
url := "_______.______.___.confluent.cloud:9092"
topic := "test"
username := "________________"
password := "________________"
clientID := "________________"
dialer := newDialer(clientID, username, password)
ctx := context.Background()
createTopic(url, topic, dialer)
for i := 0; i < 6; i++ {
go read(url, topic, dialer, i)
}
go write(url, topic, dialer)
<-ctx.Done()
}
Run Code Online (Sandbox Code Playgroud)
正在记录以下内容。
2020/11/02 23:19:22 send: V0
2020/11/02 23:19:23 send: V1
2020/11/02 23:19:23 send: V2
2020/11/02 23:19:23 send: V3
2020/11/02 23:19:24 send: V4
2020/11/02 23:19:24 send: V5
2020/11/02 23:19:24 send: V6
2020/11/02 23:19:25 send: V7
2020/11/02 23:19:25 send: V8
2020/11/02 23:19:25 send: V9
2020/11/02 23:19:25 send: V10
2020/11/02 23:19:26 send: V11
2020/11/02 23:19:26 send: V12
2020/11/02 23:19:26 send: V13
2020/11/02 23:19:26 send: V14
2020/11/02 23:19:26 send: V15
2020/11/02 23:19:27 send: V16
2020/11/02 23:19:27 send: V17
2020/11/02 23:19:27 send: V18
2020/11/02 23:19:27 send: V19
2020/11/02 23:19:28 send: V20
2020/11/02 23:19:29 send: V21
2020/11/02 23:19:29 send: V22
2020/11/02 23:19:29 send: V23
2020/11/02 23:19:29 send: V24
2020/11/02 23:19:29 send: V25
2020/11/02 23:19:30 send: V26
2020/11/02 23:19:30 send: V27
2020/11/02 23:19:30 send: V28
2020/11/02 23:19:30 send: V29
2020/11/02 23:19:31 send: V30
2020/11/02 23:19:31 send: V31
2020/11/02 23:19:31 send: V32
2020/11/02 23:19:32 send: V33
2020/11/02 23:19:32 send: V34
2020/11/02 23:19:32 rec3: V8
2020/11/02 23:19:32 rec3: V14
2020/11/02 23:19:32 rec3: V15
2020/11/02 23:19:32 rec3: V16
2020/11/02 23:19:32 rec3: V17
2020/11/02 23:19:32 rec3: V20
2020/11/02 23:19:32 rec3: V21
2020/11/02 23:19:32 rec3: V23
2020/11/02 23:19:32 rec3: V29
2020/11/02 23:19:32 rec1: V0
2020/11/02 23:19:32 rec1: V9
2020/11/02 23:19:32 rec1: V22
2020/11/02 23:19:32 rec1: V28
2020/11/02 23:19:32 rec4: V4
2020/11/02 23:19:32 rec4: V5
2020/11/02 23:19:32 rec4: V7
2020/11/02 23:19:32 rec4: V10
2020/11/02 23:19:32 rec4: V11
2020/11/02 23:19:32 rec4: V12
2020/11/02 23:19:32 rec4: V18
2020/11/02 23:19:32 rec4: V24
2020/11/02 23:19:32 rec4: V25
2020/11/02 23:19:32 rec4: V30
2020/11/02 23:19:32 rec4: V31
2020/11/02 23:19:32 send: V35
2020/11/02 23:19:32 rec5: V1
2020/11/02 23:19:32 rec5: V2
2020/11/02 23:19:32 rec5: V3
2020/11/02 23:19:32 rec5: V34
2020/11/02 23:19:32 rec2: V6
2020/11/02 23:19:32 rec2: V13
2020/11/02 23:19:32 rec2: V26
2020/11/02 23:19:32 rec2: V33
2020/11/02 23:19:32 send: V36
2020/11/02 23:19:33 send: V37
2020/11/02 23:19:33 send: V38
2020/11/02 23:19:33 send: V39
2020/11/02 23:19:33 send: V40
2020/11/02 23:19:33 send: V41
2020/11/02 23:19:33 rec0: V19
2020/11/02 23:19:33 rec0: V27
2020/11/02 23:19:33 rec0: V32
2020/11/02 23:19:34 send: V42
2020/11/02 23:19:34 send: V43
2020/11/02 23:19:34 send: V44
2020/11/02 23:19:34 send: V45
2020/11/02 23:19:34 send: V46
2020/11/02 23:19:35 send: V47
2020/11/02 23:19:35 send: V48
2020/11/02 23:19:35 send: V49
2020/11/02 23:19:35 send: V50
2020/11/02 23:19:35 send: V51
2020/11/02 23:19:35 send: V52
2020/11/02 23:19:36 send: V53
2020/11/02 23:19:36 send: V54
2020/11/02 23:19:36 send: V55
2020/11/02 23:19:36 send: V56
2020/11/02 23:19:36 send: V57
2020/11/02 23:19:37 send: V58
2020/11/02 23:19:37 send: V59
2020/11/02 23:19:37 send: V60
2020/11/02 23:19:38 send: V61
2020/11/02 23:19:38 send: V62
2020/11/02 23:19:38 send: V63
2020/11/02 23:19:38 send: V64
2020/11/02 23:19:38 send: V65
2020/11/02 23:19:39 send: V66
2020/11/02 23:19:39 send: V67
2020/11/02 23:19:39 send: V68
2020/11/02 23:19:40 send: V69
2020/11/02 23:19:40 send: V70
2020/11/02 23:19:40 send: V71
2020/11/02 23:19:40 send: V72
2020/11/02 23:19:40 send: V73
2020/11/02 23:19:40 send: V74
2020/11/02 23:19:41 send: V75
2020/11/02 23:19:41 send: V76
2020/11/02 23:19:41 rec1: V41
2020/11/02 23:19:41 rec1: V56
2020/11/02 23:19:41 rec1: V68
2020/11/02 23:19:41 rec1: V74
2020/11/02 23:19:41 rec1: V75
2020/11/02 23:19:41 rec1: V76
2020/11/02 23:19:41 rec3: V37
2020/11/02 23:19:41 rec3: V40
2020/11/02 23:19:41 rec3: V42
2020/11/02 23:19:41 rec3: V48
2020/11/02 23:19:41 rec3: V55
2020/11/02 23:19:41 rec3: V57
2020/11/02 23:19:41 rec3: V60
2020/11/02 23:19:41 rec3: V61
2020/11/02 23:19:41 rec3: V62
2020/11/02 23:19:41 send: V77
2020/11/02 23:19:41 rec4: V38
2020/11/02 23:19:41 rec4: V39
2020/11/02 23:19:41 rec4: V45
2020/11/02 23:19:41 rec4: V46
2020/11/02 23:19:41 rec4: V47
2020/11/02 23:19:41 rec4: V53
2020/11/02 23:19:41 rec4: V59
2020/11/02 23:19:41 rec4: V70
2020/11/02 23:19:41 rec4: V71
2020/11/02 23:19:41 rec4: V73
2020/11/02 23:19:41 rec5: V35
2020/11/02 23:19:41 rec5: V36
2020/11/02 23:19:41 rec5: V43
2020/11/02 23:19:41 rec5: V49
2020/11/02 23:19:41 rec5: V54
2020/11/02 23:19:41 rec5: V63
2020/11/02 23:19:41 rec5: V69
2020/11/02 23:19:41 rec5: V77
2020/11/02 23:19:41 send: V78
2020/11/02 23:19:41 rec2: V44
2020/11/02 23:19:41 rec2: V50
2020/11/02 23:19:41 rec2: V51
2020/11/02 23:19:41 rec2: V64
2020/11/02 23:19:41 rec2: V65
2020/11/02 23:19:41 rec2: V66
2020/11/02 23:19:41 rec2: V72
2020/11/02 23:19:41 send: V79
2020/11/02 23:19:42 send: V80
2020/11/02 23:19:42 send: V81
2020/11/02 23:19:42 send: V82
2020/11/02 23:19:42 send: V83
2020/11/02 23:19:42 send: V84
2020/11/02 23:19:43 send: V85
2020/11/02 23:19:43 rec0: V52
2020/11/02 23:19:43 rec0: V58
2020/11/02 23:19:43 rec0: V67
2020/11/02 23:19:43 send: V86
Run Code Online (Sandbox Code Playgroud)
任何建议将不胜感激。谢谢!
缓冲肯定只是在 Kafka-Go 中发生。Sarama 不会遇到相同的行为:
2020/11/02 23:19:22 send: V0
2020/11/02 23:19:23 send: V1
2020/11/02 23:19:23 send: V2
2020/11/02 23:19:23 send: V3
2020/11/02 23:19:24 send: V4
2020/11/02 23:19:24 send: V5
2020/11/02 23:19:24 send: V6
2020/11/02 23:19:25 send: V7
2020/11/02 23:19:25 send: V8
2020/11/02 23:19:25 send: V9
2020/11/02 23:19:25 send: V10
2020/11/02 23:19:26 send: V11
2020/11/02 23:19:26 send: V12
2020/11/02 23:19:26 send: V13
2020/11/02 23:19:26 send: V14
2020/11/02 23:19:26 send: V15
2020/11/02 23:19:27 send: V16
2020/11/02 23:19:27 send: V17
2020/11/02 23:19:27 send: V18
2020/11/02 23:19:27 send: V19
2020/11/02 23:19:28 send: V20
2020/11/02 23:19:29 send: V21
2020/11/02 23:19:29 send: V22
2020/11/02 23:19:29 send: V23
2020/11/02 23:19:29 send: V24
2020/11/02 23:19:29 send: V25
2020/11/02 23:19:30 send: V26
2020/11/02 23:19:30 send: V27
2020/11/02 23:19:30 send: V28
2020/11/02 23:19:30 send: V29
2020/11/02 23:19:31 send: V30
2020/11/02 23:19:31 send: V31
2020/11/02 23:19:31 send: V32
2020/11/02 23:19:32 send: V33
2020/11/02 23:19:32 send: V34
2020/11/02 23:19:32 rec3: V8
2020/11/02 23:19:32 rec3: V14
2020/11/02 23:19:32 rec3: V15
2020/11/02 23:19:32 rec3: V16
2020/11/02 23:19:32 rec3: V17
2020/11/02 23:19:32 rec3: V20
2020/11/02 23:19:32 rec3: V21
2020/11/02 23:19:32 rec3: V23
2020/11/02 23:19:32 rec3: V29
2020/11/02 23:19:32 rec1: V0
2020/11/02 23:19:32 rec1: V9
2020/11/02 23:19:32 rec1: V22
2020/11/02 23:19:32 rec1: V28
2020/11/02 23:19:32 rec4: V4
2020/11/02 23:19:32 rec4: V5
2020/11/02 23:19:32 rec4: V7
2020/11/02 23:19:32 rec4: V10
2020/11/02 23:19:32 rec4: V11
2020/11/02 23:19:32 rec4: V12
2020/11/02 23:19:32 rec4: V18
2020/11/02 23:19:32 rec4: V24
2020/11/02 23:19:32 rec4: V25
2020/11/02 23:19:32 rec4: V30
2020/11/02 23:19:32 rec4: V31
2020/11/02 23:19:32 send: V35
2020/11/02 23:19:32 rec5: V1
2020/11/02 23:19:32 rec5: V2
2020/11/02 23:19:32 rec5: V3
2020/11/02 23:19:32 rec5: V34
2020/11/02 23:19:32 rec2: V6
2020/11/02 23:19:32 rec2: V13
2020/11/02 23:19:32 rec2: V26
2020/11/02 23:19:32 rec2: V33
2020/11/02 23:19:32 send: V36
2020/11/02 23:19:33 send: V37
2020/11/02 23:19:33 send: V38
2020/11/02 23:19:33 send: V39
2020/11/02 23:19:33 send: V40
2020/11/02 23:19:33 send: V41
2020/11/02 23:19:33 rec0: V19
2020/11/02 23:19:33 rec0: V27
2020/11/02 23:19:33 rec0: V32
2020/11/02 23:19:34 send: V42
2020/11/02 23:19:34 send: V43
2020/11/02 23:19:34 send: V44
2020/11/02 23:19:34 send: V45
2020/11/02 23:19:34 send: V46
2020/11/02 23:19:35 send: V47
2020/11/02 23:19:35 send: V48
2020/11/02 23:19:35 send: V49
2020/11/02 23:19:35 send: V50
2020/11/02 23:19:35 send: V51
2020/11/02 23:19:35 send: V52
2020/11/02 23:19:36 send: V53
2020/11/02 23:19:36 send: V54
2020/11/02 23:19:36 send: V55
2020/11/02 23:19:36 send: V56
2020/11/02 23:19:36 send: V57
2020/11/02 23:19:37 send: V58
2020/11/02 23:19:37 send: V59
2020/11/02 23:19:37 send: V60
2020/11/02 23:19:38 send: V61
2020/11/02 23:19:38 send: V62
2020/11/02 23:19:38 send: V63
2020/11/02 23:19:38 send: V64
2020/11/02 23:19:38 send: V65
2020/11/02 23:19:39 send: V66
2020/11/02 23:19:39 send: V67
2020/11/02 23:19:39 send: V68
2020/11/02 23:19:40 send: V69
2020/11/02 23:19:40 send: V70
2020/11/02 23:19:40 send: V71
2020/11/02 23:19:40 send: V72
2020/11/02 23:19:40 send: V73
2020/11/02 23:19:40 send: V74
2020/11/02 23:19:41 send: V75
2020/11/02 23:19:41 send: V76
2020/11/02 23:19:41 rec1: V41
2020/11/02 23:19:41 rec1: V56
2020/11/02 23:19:41 rec1: V68
2020/11/02 23:19:41 rec1: V74
2020/11/02 23:19:41 rec1: V75
2020/11/02 23:19:41 rec1: V76
2020/11/02 23:19:41 rec3: V37
2020/11/02 23:19:41 rec3: V40
2020/11/02 23:19:41 rec3: V42
2020/11/02 23:19:41 rec3: V48
2020/11/02 23:19:41 rec3: V55
2020/11/02 23:19:41 rec3: V57
2020/11/02 23:19:41 rec3: V60
2020/11/02 23:19:41 rec3: V61
2020/11/02 23:19:41 rec3: V62
2020/11/02 23:19:41 send: V77
2020/11/02 23:19:41 rec4: V38
2020/11/02 23:19:41 rec4: V39
2020/11/02 23:19:41 rec4: V45
2020/11/02 23:19:41 rec4: V46
2020/11/02 23:19:41 rec4: V47
2020/11/02 23:19:41 rec4: V53
2020/11/02 23:19:41 rec4: V59
2020/11/02 23:19:41 rec4: V70
2020/11/02 23:19:41 rec4: V71
2020/11/02 23:19:41 rec4: V73
2020/11/02 23:19:41 rec5: V35
2020/11/02 23:19:41 rec5: V36
2020/11/02 23:19:41 rec5: V43
2020/11/02 23:19:41 rec5: V49
2020/11/02 23:19:41 rec5: V54
2020/11/02 23:19:41 rec5: V63
2020/11/02 23:19:41 rec5: V69
2020/11/02 23:19:41 rec5: V77
2020/11/02 23:19:41 send: V78
2020/11/02 23:19:41 rec2: V44
2020/11/02 23:19:41 rec2: V50
2020/11/02 23:19:41 rec2: V51
2020/11/02 23:19:41 rec2: V64
2020/11/02 23:19:41 rec2: V65
2020/11/02 23:19:41 rec2: V66
2020/11/02 23:19:41 rec2: V72
2020/11/02 23:19:41 send: V79
2020/11/02 23:19:42 send: V80
2020/11/02 23:19:42 send: V81
2020/11/02 23:19:42 send: V82
2020/11/02 23:19:42 send: V83
2020/11/02 23:19:42 send: V84
2020/11/02 23:19:43 send: V85
2020/11/02 23:19:43 rec0: V52
2020/11/02 23:19:43 rec0: V58
2020/11/02 23:19:43 rec0: V67
2020/11/02 23:19:43 send: V86
Run Code Online (Sandbox Code Playgroud)
事实上,在某些情况下,它在确认发送之前实际接收,这让我想知道是否有内部消息发生,如果是,我是否应该关心......
recv: V1176 p: 1 offset: 355
recv: V1177 p: 2 offset: 363
send: V1177 p: 2 offset: 363
send: V1178 p: 5 offset: 377
recv: V1178 p: 5 offset: 377
recv: V1179 p: 1 offset: 356
send: V1179 p: 1 offset: 356
send: V1180 p: 1 offset: 357
recv: V1180 p: 1 offset: 357
recv: V1181 p: 1 offset: 358
send: V1181 p: 1 offset: 358
send: V1182 p: 4 offset: 393
recv: V1182 p: 4 offset: 393
send: V1183 p: 4 offset: 394
recv: V1183 p: 4 offset: 394
send: V1184 p: 3 offset: 358
recv: V1184 p: 3 offset: 358
send: V1185 p: 2 offset: 364
recv: V1185 p: 2 offset: 364
send: V1186 p: 3 offset: 359
recv: V1186 p: 3 offset: 359
recv: V1187 p: 3 offset: 360
send: V1187 p: 3 offset: 360
send: V1188 p: 5 offset: 378
recv: V1188 p: 5 offset: 378
send: V1189 p: 2 offset: 365
recv: V1189 p: 2 offset: 365
recv: V1190 p: 4 offset: 395
send: V1190 p: 4 offset: 395
send: V1191 p: 1 offset: 359
recv: V1191 p: 1 offset: 359
send: V1192 p: 4 offset: 396
recv: V1192 p: 4 offset: 396
send: V1193 p: 0 offset: 431
recv: V1193 p: 0 offset: 431
send: V1194 p: 4 offset: 397
recv: V1194 p: 4 offset: 397
recv: V1195 p: 2 offset: 366
send: V1195 p: 2 offset: 366
send: V1196 p: 3 offset: 361
recv: V1196 p: 3 offset: 361
Run Code Online (Sandbox Code Playgroud)
您需要更改ReaderConfig.MinBytes
,否则segmentio/kafka-go
将其设置为1e6 = 1 MB
,在这种情况下,Kafka 将等待那么多数据积累后再响应请求。
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
MinBytes: 1, // same value of Shopify/sarama
MaxBytes: 57671680,
})
}
Run Code Online (Sandbox Code Playgroud)
另一方面,默认值shopify/sarama
是 1 个字节。
参考: