我已经检查了(at commit )main的分支,但我注意到在 VS Code 中我无法像往常一样“转到定义”。如果我将鼠标悬停在 中的包名称上,我会收到 linter 警告github.com/Shopify/sarama947343309601b4eb3c2fa3e7d15d701b503dd491 saramafunctional_consumer_group_test.go
No packages found for open file /Users/kurtpeek/go/src/github.com/Shopify/sarama/functional_consumer_group_test.go: <nil>.
If this file contains build tags, try adding "-tags=<build tag>" to your gopls "buildFlags" configuration (see (https://github.com/golang/tools/blob/master/gopls/doc/settings.md#buildflags-string).
Otherwise, see the troubleshooting guidelines for help investigating (https://github.com/golang/tools/blob/master/gopls/doc/troubleshooting.md).go list
Run Code Online (Sandbox Code Playgroud)
(参见下面的屏幕截图)。
从命令行,如果我尝试gopls该文件,我会收到类似的错误:
> gopls check functional_consumer_group_test.go
gopls: no packages returned: packages.Load error
Run Code Online (Sandbox Code Playgroud)
我怀疑这与该文件中的构建约束( https://pkg.go.dev/cmd/go#hdr-Build_constraints )有关,来自https://github.com/Shopify/sarama/blob/947343309601b4eb3c2fa3e7d15d701b503dd491 /function_consumer_group_test.go#L1-L2,
No packages found for open file /Users/kurtpeek/go/src/github.com/Shopify/sarama/functional_consumer_group_test.go: <nil>.
If this …Run Code Online (Sandbox Code Playgroud) 是否有可能在sarama中创建kafka主题?我知道java API可以让你创建主题,但我找不到任何有关如何在sarama中执行此操作的信息.如果可能的话,我应该使用的示例或解释将非常感谢提前
Shopify/sarama是否提供类似于transactional.idJVM API的选项?
该库支持幂等(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白如何在没有transactional.id.
如果我错了,请纠正我,Sarama 中缺少有关这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。
我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。
更新:原来我在 Docker 中的端口有问题。不知道为什么解决了这个现象。
我相信我遇到了一个奇怪的错误。我正在使用Sarama库并且能够成功创建消费者。
func main() {
config = sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
// Create new consumer
master, err := sarama.NewConsumer("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
partitionConsumer, err := master.ConsumePartition("myTopic",0,
sarama.OffsetOldest)
if err != nil {
panic(err)
}
}
Run Code Online (Sandbox Code Playgroud)
一旦我打破这段代码并移到主程序之外,我就会遇到错误:
kafka:客户端已经没有可用的代理可以与之交谈(您的集群是否可访问?)
我已将我的代码拆分如下:我现在已将之前的 main() 方法转换为一个消费者包,其中包含一个名为 NewConsumer() 的方法,而我的新 main() 像这样调用 NewConsumer():
c := consumer.NewConsumer()
Run Code Online (Sandbox Code Playgroud)
恐慌声明在行中被触发sarama.NewConsumer并打印出来kafka: client has …
我正在尝试使用zap 记录器包创建一个带有文件、控制台和 Kafka 接收器的核心。我有一些非常特定INFO级别的日志,我想将它们发送到 Kafka 主题以供下游消费者处理。然而,在当前的实现中,我获得了INFOKafka 主题中的所有级别日志,甚至是我不想要的日志。
有没有一种方法可以使用通用的 zap 记录器对象来防止同一级别的不需要的日志不进入任何一个特定的接收器?
下面是我用来创建单个记录器对象的函数。
func newZapLogger(config Configuration) (Logger, error) {
var writer zapcore.WriteSyncer
cores := []zapcore.Core{}
if config.EnableFile {
getLogLevel(config.FileLevel)
if config.LogConfig == true {
writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
Filename: config.FileLocation,
MaxSize: config.LogMaxSize,
Compress: config.LogCompression,
MaxAge: config.LogMaxAge,
}))
} else {
writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
Filename: config.FileLocation,
}))
}
cores = append(cores, zapcore.NewCore(getEncoder(config.FileJSONFormat, config.IsColour), writer, atomLevel))
}
if config.EnableConsole {
getLogLevel(config.ConsoleLevel)
switch config.Stream {
case 1:
writer = zapcore.Lock(os.Stdout) …Run Code Online (Sandbox Code Playgroud) 我有一个 Kafka 实例正在运行(本地,在 Docker 中),并且我使用sarama 包在 Go 中创建了一个生产者。
由于我想在我的主题上使用 Kafka Streams,生产者必须在消息中嵌入时间戳,否则我会收到这个丑陋的错误消息:
org.apache.kafka.streams.errors.StreamsException:输入记录ConsumerRecord(主题=crawler_events,分区= 0,偏移= 0,CreateTime = -1,序列化键大小= -1,序列化值大小= 187,标题= RecordHeaders( headers = []、isReadOnly = false)、key = null、value = {XXX}) 具有无效(负)时间戳。可能是因为使用 0.10 之前版本的生产者客户端将此记录写入 Kafka,而没有嵌入时间戳,或者因为输入主题是在 Kafka 集群升级到 0.10+ 之前创建的。使用不同的 TimestampExtractor 来处理此数据。
以下是在我的 Go 程序中发送消息的代码部分:
// Init a connection to the Kafka host,
// create the producer,
// and count successes and errors in delivery
func (c *kafkaClient) init() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
c.config = *config
var err error …Run Code Online (Sandbox Code Playgroud) 我正在使用 github.com/Shopify/sarama 包与 Kafka 交互。在我当前的方法中,我可以连接到代理并毫无问题地获取所有主题名称(下面的消费者代码)。
但是,当我尝试使用管理客户端(下面的管理代码)删除某些主题时,我收到“拨号 tcp:查找 ip-xx-xx.ec2.internal:没有这样的主机”错误。
我不知道为什么会收到此错误。我非常感谢任何提示或可能的解决方案。
消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
//get broker
cluster, err := sarama.NewConsumer("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := cluster.Close(); err != nil {
panic(err)
}
}()
//get all topic from cluster
topics, _ := cluster.Topics()
Run Code Online (Sandbox Code Playgroud)
行政
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_4_0_0
//admin broker
admin, err := sarama.NewClusterAdmin("localhost:9092", config)
if err != nil {
panic(err)
} …Run Code Online (Sandbox Code Playgroud) 最近,我开始学习使用kafka工作。我正在开发的项目使用sarama。
为了阅读消息,我使用ConsumerGroup.
foo如果返回,我需要在一段时间后再次阅读该消息false。如何才能做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
Run Code Online (Sandbox Code Playgroud) 可用的库是sarama(或其扩展sarama-cluster),但是没有提供消费者群体示例,在sarama或sarama-cluster中均未提供。
我不了解API。我可以举一个为主题创建消费者群体的示例吗?
我正在尝试使用sarama(管理员模式)创建主题。如果没有ConfigEntries,则可以正常运行。但是我需要定义一些配置。
我设置了主题配置(这里发生了错误):
tConfigs := map[string]*string{
"cleanup.policy": "delete",
"delete.retention.ms": "36000000",
}
Run Code Online (Sandbox Code Playgroud)
但是然后我得到一个错误:
./main.go:99:28: cannot use "delete" (type string) as type *string in map value
./main.go:100:28: cannot use "36000000" (type string) as type *string in map value
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用这样的管理模式:
err = admin.CreateTopic(t.Name, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 3,
ConfigEntries: tConfigs,
}, false)
Run Code Online (Sandbox Code Playgroud)
这是sarama模块中定义CreateTopic()的代码行, 网址为https://github.com/Shopify/sarama/blob/master/admin.go#L18
基本上,我不了解指针字符串的映射是如何工作的:)
我尝试过shopify/sarama库来使用kafka消息。Consumer接口和接口我都用过ConsumerGroup。我可以使用ConsumePartition()中的方法从特定分区消费Consumer。但是当我使用ConsumerGroup接口时,我似乎没有能力从特定分区消费。
有没有办法让我将某些分区分配给消费者组内的特定消费者?或者这是我无法干涉的事情?