Rag*_*hav 1 scala kafka-consumer-api
当我通过使用url创建以下简单的kafka消费者时:https : //gist.github.com/akhil/6dfda8a04e33eff91a20。
在该链接中,要打印消耗的记录,请使用未识别的单词“ asScala”。好的,告诉我如何迭代返回类型:ConsumerRecord [String,String],它是poll()方法的返回类型。
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
object KafkaConsumerEx extends App {
val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"
val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")
while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().
}
}Run Code Online (Sandbox Code Playgroud)
感谢在广告中。
您可以轻松地做到这一点
for (record <- consumer_record.iterator()) {
println(s"Here's your $record")
}
Run Code Online (Sandbox Code Playgroud)
记住要添加此导入:
import scala.collection.JavaConversions._
Run Code Online (Sandbox Code Playgroud)