Mar*_*son 6 go race-condition apache-kafka uber-zap
我正在创建一个自定义记录器,我们可以在其中登录到 std out 和 std err,但也增加了登录到 kafka 的可能性(代码示例在这里:https : //github.com/roppa/kafka-go)。我们有多个主题,所以我们需要多个记录器,但是当我们使用多个记录器时,我们会发生一些奇怪的事情。当两个 kafka-go 设置都是异步时,我没有收到消费者消息,当一个是异步的,另一个是同步的,我们会得到这样的消息:
//consumer topica
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:04.023Z","msg":"topic-a log 1","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:05.078Z","msg":"topic-a log 2","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:06.085Z","msg":"topic-a log 3","UID":"abc123","ns":"test-service"}
//consumer topicb
2020-12-09T15:31:06.085Z INFO topic-a log 3 {"UID": "abc123", "ns": "test-service"}
2","UID":"abc123","ns":"test-service"}
Run Code Online (Sandbox Code Playgroud)
更改同步会产生完全不同的效果。我对 Go 很陌生。
这是 main.go:
//consumer topica
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:04.023Z","msg":"topic-a log 1","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:05.078Z","msg":"topic-a log 2","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:06.085Z","msg":"topic-a log 3","UID":"abc123","ns":"test-service"}
//consumer topicb
2020-12-09T15:31:06.085Z INFO topic-a log 3 {"UID": "abc123", "ns": "test-service"}
2","UID":"abc123","ns":"test-service"}
Run Code Online (Sandbox Code Playgroud)
这是记录器/logger.go:
package main
import (
"context"
"kafka-log/logger"
)
func main() {
loggerA := logger.Init("test-service", "localhost:9092", "topica", false, false)
loggerB := logger.Init("test-service", "localhost:9092", "topicb", false, true)
ctx := context.Background()
ctx2 := context.WithValue(ctx, logger.UID, "abc123")
loggerA.CInfo(ctx2, "topic-a log 1")
loggerB.CInfo(ctx2, "topic-b log 1")
loggerA.CInfo(ctx2, "topic-a log 2")
loggerB.CInfo(ctx2, "topic-b log 2")
loggerA.CInfo(ctx2, "topic-a log 3")
loggerB.CInfo(ctx2, "topic-b log 3")
}
Run Code Online (Sandbox Code Playgroud)
我正在为消费者使用这些:
docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica
docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb
Run Code Online (Sandbox Code Playgroud)
这是我的 kafka docker-compose:
package logger
import (
"context"
"os"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type (
key string
// Logger type embeds zap and also contains the current system name (namespace, Ns)
Logger struct {
*zap.Logger
Ns string
}
// KConfig type for creating a new Kafka logger. Takes a Namespace,
// Broker (eg 'localhost:9092'), Topic (eg 'topic-a')
KConfig struct {
Namespace string
Broker string
Topic string
Async bool
}
producerInterface interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}
// KafkaProducer contains a kafka.Producer and Kafka topic
KafkaProducer struct {
Producer producerInterface
Topic string
}
)
const (
// UID - uniquely request identifier
UID key = "request_id"
)
var customConfig = zapcore.EncoderConfig{
TimeKey: "timeStamp",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
}
// CInfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap Info
func (l *Logger) CInfo(ctx context.Context, msg string, fields ...zap.Field) {
l.Info(msg, consolidate(ctx, l.Ns, fields...)...)
}
func consolidate(ctx context.Context, namespace string, fields ...zap.Field) []zap.Field {
return append(append(ctxToZapFields(ctx), fields...), zap.String("ns", namespace))
}
// See advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#L105
var lowPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl < zapcore.ErrorLevel && lvl > zapcore.DebugLevel
})
var debugPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl < zapcore.ErrorLevel
})
var kafkaPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl > zapcore.DebugLevel
})
// Init creates a new instance of a logger. Namespace is the name of the module using the logger. broker and topic are Kafa specific,
// if either of these is not set a default console logger is created.
func Init(namespace, broker, topic string, debug, async bool) *Logger {
var kp *KafkaProducer = nil
if broker != "" && topic != "" {
kp = NewKafkaProducer(&KConfig{
Broker: broker,
Topic: topic,
Async: async,
})
}
logger := getLogger(debug, kp)
// logger.Info("initiated logger", zap.String("ns", namespace), zap.Bool("kafka", kp != nil), zap.Bool("debug", debug))
return &Logger{logger, namespace}
}
func getLogger(debug bool, kp *KafkaProducer) *zap.Logger {
// cores are logger interfaces
var cores []zapcore.Core
// optimise message for console output (human readable)
consoleEncoder := zapcore.NewConsoleEncoder(customConfig)
// Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use.
// See https://godoc.org/go.uber.org/zap/zapcore
cores = append(cores,
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), getPriority(debug)),
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), zap.ErrorLevel),
)
if kp != nil {
cores = append(cores, zapcore.NewCore(zapcore.NewJSONEncoder(customConfig), zapcore.Lock(zapcore.AddSync(kp)), kafkaPriority))
}
// join inputs, encoders, level-handling functions into cores, then "tee" together
logger := zap.New(zapcore.NewTee(cores...))
defer logger.Sync()
return logger
}
func getPriority(debug bool) zap.LevelEnablerFunc {
if debug {
return debugPriority
}
return lowPriority
}
func ctxToZapFields(ctx context.Context) []zap.Field {
reqID, _ := ctx.Value(UID).(string)
return []zap.Field{
zap.String("UID", reqID),
}
}
// NewKafkaProducer instantiates a kafka.Producer, saves topic, and returns a KafkaProducer
func NewKafkaProducer(c *KConfig) *KafkaProducer {
return &KafkaProducer{
Producer: kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{c.Broker},
Topic: c.Topic,
Balancer: &kafka.Hash{},
Async: c.Async,
RequiredAcks: -1, // -1 = all
}),
Topic: c.Topic,
}
}
// Write takes a message as a byte slice, wraps in a kafka.message and calls kafka Produce
func (kp *KafkaProducer) Write(msg []byte) (int, error) {
return len(msg), kp.Producer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(""),
Value: msg,
})
}
Run Code Online (Sandbox Code Playgroud)
我想象你的程序在异步消息有时间发送之前就退出了(尽管如果我正确地阅读你的示例,我很奇怪“topic-a log 3”是唯一的日志消息)。与 javascript 不同,Go 不会等待所有线程/goroutine 终止才退出。
还将突出显示 kafka-go 异步配置的文档字符串:
// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
Run Code Online (Sandbox Code Playgroud)
在解决方案方面:我认为你可以通过调用Close作者来解决这个问题:
https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close
Close 刷新挂起的写入,并在返回之前等待所有写入完成。调用 Close 还会阻止新的写入提交给编写器,对 WriteMessages 等的进一步调用将失败并出现 io.ErrClosedPipe。
您需要在退出之前浮出底层KafkaProducer.Producer并调用KafkaProducer.Producer.Close。
可能有更聪明的方法来构造清理,但我似乎找不到比仅在编写器上调用 Close 更简单的方法来刷新待处理消息。