相关疑难解决方法(0)

Kafka是否支持请求响应消息传递

我正在调查Kafka 9作为业余爱好项目,并完成了一些"Hello World"类型示例.

我必须考虑基于请求响应消息传递的Real World Kafka应用程序,更具体地说,如何将Kafka请求消息链接到其响应消息.

我正在考虑使用生成的UUID作为请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息关联ID的机制大致相同.

我的结束2结束过程将是.

1).Kafka客户端生成随机UUID并发送单个Kafka请求消息.2).服务器将使用此请求消息提取并存储请求UUID值3).使用消息有效内容完成业务流程.4).响应响应消息,该消息使用来自请求消息的存储的UUID值作为响应消息Key.5).Kafka客户端轮询响应主题,直到它超时或检索具有原始请求UUID值的消息.

我关注的是Kafka Consumer轮询将从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败.

我是否尝试在一个用例中应用Kafka它从未设计过?

是否可以在Kafka中实现请求/响应消息传递?

mom apache-kafka kafka-consumer-api kafka-producer-api

19
推荐指数
1
解决办法
1万
查看次数

Kafka制作人可以创建主题和分区吗?

目前我正在评估不同的消息系统.有一个与Apache Kafka有关的问题,我无法回答.

Kafka制作人是否可以动态创建主题和分区(在现有主题上)?如果是的话,它有什么不利之处吗?

提前致谢

messaging apache-kafka kafka-producer-api

8
推荐指数
2
解决办法
1万
查看次数

kafka消费者动态检测添加的主题

我正在使用KafkaConsumer来消费来自Kafka服务器(主题)的消息.

  • 它适用于在启动消费者代码之前创建的主题...

但问题是,如果动态创建主题(我的意思是说消费者代码启动后),它将无法工作,但API表示它将支持动态主题创建..这是您的参考链接..

使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这是JAVA代码......

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

5
推荐指数
2
解决办法
4701
查看次数