The*_*aDe 0 .net c# apache-kafka kafka-producer-api confluent-kafka-dotnet
我在最近的 C# 项目中使用 Confluence kafka 包。我通过以下方式创建了一个生产者:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
Run Code Online (Sandbox Code Playgroud)
但问题是我的一些信息没有传达给消费者。他们在某个地方迷路了。但是,如果我对生产者使用await ,那么所有消息都会被传递。如何无需等待即可传递我的所有消息。(我有一个分区)
首先,您应该只使用一条消息Producer来发送您的消息msgList,因为为每条消息创建一条新消息Producer非常昂贵。
你能做的就是使用Produce()方法与Flush(). 使用Produce()您将异步发送消息而无需等待响应。然后调用一个Flush()will 阻塞,直到所有正在传输的消息都被传递。
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
Run Code Online (Sandbox Code Playgroud)
如果没有await,Flush()您可能会丢失消息,因为您的生产者可能会在所有消息传递之前被处置。
| 归档时间: |
|
| 查看次数: |
1787 次 |
| 最近记录: |