ann*_*iid 6 scala apache-kafka
我正在尝试在scala中实现kafka使用者.我已经看过一百万个关于如何用Java做的教程,甚至一些(比如这个)说它是scala的,但它是用Java编写的.
有谁知道我在哪里可以找到如何在Scala中编写它的示例?我刚开始学习Scala所以也许链接的例子可以在Scala中使用,即使它是用Java编写的,但我老实说我不知道我现在正在做什么.我google的所有内容都只是将我与如何用Java联系起来.
Yuv*_*kov 11
您在Java中看到大多数示例的原因是新的KafkaProducer0.8.2.2是用Java编写的.
假设您使用sbt作为构建系统,并假设您使用Kafka 0.8.2.2(您可以根据需要更改版本),您将需要:
libraryDependencies ++= {
Seq(
"org.apache.kafka" %% "kafka" % "0.8.2.2",
"org.apache.kafka" % "kafka-clients" % "0.8.2.2",
)
}
Run Code Online (Sandbox Code Playgroud)
一个简单的例子应该让你开始:
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaExample {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("group.id", "consumer-tutorial")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe("firstTopic", "secondTopic")
while (true) {
val results = kafkaConsumer.poll(2000).asScala
for ((topic, data) <- results) {
// Do stuff
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12399 次 |
| 最近记录: |