Yoh*_*ohn 17 apache-kafka kafka-consumer-api
我开始使用最新的Kafka文档http://kafka.apache.org/documentation.html.但是当我尝试使用新的Consumer API时遇到了一些问题.我已完成以下步骤:
1.添加新的依赖项
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
2.添加配置
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
Run Code Online (Sandbox Code Playgroud)
3.使用KafkaConsumer API
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试从代理轮询消息时,我只得到null:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
Run Code Online (Sandbox Code Playgroud)
然后,在检查源代码后,我知道消费者有什么问题:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
Run Code Online (Sandbox Code Playgroud)
更糟糕的是,我找不到关于0.8.2 API的任何其他有用信息,因为关于Kafka的所有用法都与最新版本不兼容.有人能帮助我吗?非常感谢.
我还尝试在 Kafka 0.8.2.1 之上编写一个 Consumer 来读取新 Producer 生成的消息。
到目前为止,我得到的是生产者 API 已准备就绪并可用,而在消费者方面,我们必须等待 0.8.3,正如 @habsq 指出的那样,您已经发现包含一些针对消费者的代码,但它仍然无法正常工作。
因此,要使用的客户端(当前的客户端 API)是在当前 Kafka 版本的“核心”项目中找到的客户端,即 0.8.2.1(最好不要将客户端降级到任何其他版本)。
因此,现在我们需要导入两个 jar:一个用于“新”java 客户端,另一个用于核心项目,这也取决于您使用的 scala 版本(我使用 2.11)。
就我而言,我使用 graddle 来管理依赖项,所以我只需要导入
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}
Run Code Online (Sandbox Code Playgroud)
当您更新依赖项时,它将获得所有需要的库。
如果您使用不同的 Scala 版本,只需更改版本即可;无论如何,你可以在mavencentral上找到所有不同的版本或完整的pom: http://search.maven.org/#search| ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A %220.8.2.1%22
如果您使用这些 Consumer 实现,那么当前的所有示例都应该照常工作。
PS参考:Kafka-users ml线程http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2
| 归档时间: |
|
| 查看次数: |
7321 次 |
| 最近记录: |