我刚刚起步并运行Kafka 0.8 beta 1.我有一个非常简单的示例启动和运行,问题是,我只能让一个消息消费者工作,而不是几个.也就是说,runSingleWorker()方法可以工作.run()方法不起作用:
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import com.truecar.inventory.worker.core.application.config.AppConfig;
public class ConsumerThreadPool {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
public ConsumerThreadPool(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
this.topic = topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(Integer numThreads) …Run Code Online (Sandbox Code Playgroud) 我想尝试kafka 0.8(据我所知它已经发布).但是我在哪里可以找到kafka maven存储库.
我应该添加哪些额外的存储库URL?
我找到了一些博客
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.0-SHA</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
但它不起作用.我正在寻找合适的maven依赖.或者我应该从git中检出它并部署在我们的内部神器中?