Nis*_*ant 10 java cassandra redis apache-kafka kafka-consumer-api
我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至连一条消息都不会错过.所以这就是问题所在:如果由于任何原因(ZooKeeper down,Kafka broker等)向Kafka发布失败,我们如何能够有效地处理这些消息并在事情再次恢复后重播它们.正如我所说的那样,即使单个消息失败也无法承受.另一个用例是我们还需要在任何给定的时间点知道有多少消息由于任何原因而无法发布到Kafka,例如计数器功能,现在这些消息需要再次重新发布.
其中一个解决方案是将这些消息推送到某个数据库(如Cassandra,其中写入速度非常快,但我们还需要计数器功能,我猜Cassandra计数器功能并不是那么好,我们不想使用它.)可以处理这种负载也为我们提供了非常准确的计数器设施.
这个问题更多来自架构方面,然后是使用哪种技术来实现这一目标.
PS:我们处理像3000TPS这样的地方.因此,当系统启动失败时,这些失败的消息可以在非常短的时间内快速增长.我们正在使用基于java的框架.
谢谢你的帮助!
我参加聚会超级迟到。但我发现上面的答案中缺少一些东西:)
选择像 Cassandra 这样的分布式系统的策略是一个不错的主意。一旦Kafka启动并正常,您可以重试写入其中的所有消息。
我想从“知道有多少消息在给定时间发布失败”来回答
从标签中,我看到您正在使用apache-kafka和kafka-consumer-api。您可以为您的生产者编写一个自定义回调,这个回调可以告诉您消息是失败还是成功发布。失败时,记录消息的元数据。
现在,您可以使用日志分析工具来分析您的故障。Splunk 就是这样一种不错的工具。
下面是一个小代码片段,可以更好地解释我正在谈论的回调:
public class ProduceToKafka {
private ProducerRecord<String, String> message = null;
// TracerBulletProducer class has producer properties
private KafkaProducer<String, String> myProducer = TracerBulletProducer
.createProducer();
public void publishMessage(String string) {
ProducerRecord<String, String> message = new ProducerRecord<>(
"topicName", string);
myProducer.send(message, new MyCallback(message.key(), message.value()));
}
class MyCallback implements Callback {
private final String key;
private final String value;
public MyCallback(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("--------> All good !!");
} else {
log.info("--------> not so good !!");
log.info(metadata.toString());
log.info("" + metadata.serializedValueSize());
log.info(exception.getMessage());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果您分析"--------> not so good !!"每个时间单位的日志数量,您可以获得所需的洞察力。
神速!
Kafka以分布式,容错的方式构建的原因是处理与您的问题完全相同的问题,核心组件的多次故障应该避免服务中断.要避免关闭Zookeeper,请至少部署3个Zookeepers实例(如果这是在AWS中,请在可用区域中部署它们).要避免代理失败,请部署多个代理,并确保在生产者bootstrap.servers属性中指定多个代理.要确保Kafka群集已在持久庄园中编写您的消息,请确保acks=all在生产者中设置该属性.当所有同步副本确认收到消息时(以吞吐量为代价),这将确认客户端写入.您还可以设置排队限制,以确保如果对代理的写入开始备份,您可以捕获异常并处理它并可能重试.
使用Cassandra(另一个经过深思熟虑的分布式,容错系统)来"分级"你的写入似乎不会给你的架构增加任何可靠性,但确实增加了复杂性,而且Cassandra没有被编写为一个消息队列一个消息队列,我会避免这种情况.
如果配置正确,Kafka应该可以处理所有的消息写入并提供适当的保证.
| 归档时间: |
|
| 查看次数: |
6513 次 |
| 最近记录: |