我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。
消费者.java
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class KafkaConsumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = " AATest";
ConsumerConnector consumerConnector;
public static void main(String[] argv) throws UnsupportedEncodingException {
KafkaConsumer KafkaConsumer = new KafkaConsumer();
KafkaConsumer.start();
}
public KafkaConsumer(){
Properties properties = new Properties();
properties.put("zookeeper.connect","10.200.208.59:2181");
properties.put("group.id","test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
System.out.println(stream);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println("from it");
System.out.println(new String(it.next().message()));
}
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行上面的代码时,我在控制台中什么也没有得到,屏幕后面的 java 生产者程序在“AATest”主题下连续发布数据。在动物园管理员控制台中,当我尝试运行上面的 consumer.java 时,我得到以下几行
[2015-04-30 15:57:31,284] INFO Accepted socket connection from /10.200.208.59:51780 (org.apache.zookeeper.
server.NIOServerCnxnFactory)
[2015-04-30 15:57:31,284] INFO Client attempting to establish new session at /10.200.208.59:51780 (org.apa
che.zookeeper.server.ZooKeeperServer)
[2015-04-30 15:57:31,315] INFO Established session 0x14d09cebce30007 with negotiated timeout 6000 for clie
nt /10.200.208.59:51780 (org.apache.zookeeper.server.ZooKeeperServer)
Run Code Online (Sandbox Code Playgroud)
此外,当我运行一个指向 AATest 主题的单独控制台消费者时,我将生产者生成的所有数据都用于该主题。
消费者和经纪人都在同一台机器上,而生产者在不同的机器上。这实际上类似于这个问题。但是经历它对我有帮助。请帮我。
Different answer but it happened to be initial offset (auto.offset.reset) for a consumer in my case. So, setting up auto.offset.reset=earliest fixed the problem in my scenario. Its because I was publishing event first and then starting a consumer.
By default, consumer only consumes events published after it started because auto.offset.reset=latest by default.
eg. consumer.properties
bootstrap.servers=localhost:9092
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)
class KafkaEventConsumerSpecs extends FunSuite {
case class TestEvent(eventOffset: Long, hashValue: Long, created: Date, testField: String) extends BaseEvent
test("given an event in the event-store, consumes an event") {
EmbeddedKafka.start()
//PRODUCE
val event = TestEvent(0l, 0l, new Date(), "data")
val config = new Properties() {
{
load(this.getClass.getResourceAsStream("/producer.properties"))
}
}
val producer = new KafkaProducer[String, String](config)
val persistedEvent = producer.send(new ProducerRecord(event.getClass.getSimpleName, event.toString))
assert(persistedEvent.get().offset() == 0)
assert(persistedEvent.get().checksum() != 0)
//CONSUME
val consumerConfig = new Properties() {
{
load(this.getClass.getResourceAsStream("/consumer.properties"))
put("group.id", "consumers_testEventsGroup")
put("client.id", "testEventConsumer")
}
}
assert(consumerConfig.getProperty("group.id") == "consumers_testEventsGroup")
val kafkaConsumer = new KafkaConsumer[String, String](consumerConfig)
assert(kafkaConsumer.listTopics().asScala.map(_._1).toList == List("TestEvent"))
kafkaConsumer.subscribe(Collections.singletonList("TestEvent"))
val events = kafkaConsumer.poll(1000)
assert(events.count() == 1)
EmbeddedKafka.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
But if consumer is started first and then published, the consumer should be able to consume the event without auto.offset.reset required to be set to earliest.
https://kafka.apache.org/documentation/#consumerconfigs
| 归档时间: |
|
| 查看次数: |
15236 次 |
| 最近记录: |