wip*_*man 9 scala actor apache-kafka spark-streaming
我希望有一个消费者演员订阅Kafka主题并流式传输数据,以便在消费者之外使用Spark Streaming进行进一步处理.为什么演员呢?因为我读到它的主管策略是处理Kafka失败的好方法(例如,重启失败).
我找到了两个选择:
KafkaConsumer类:它的poll()方法返回一个Map[String, Object].我希望像我一样DStream返回KafkaUtils.createDirectStream,而且我不知道如何从actor外部获取流.ActorHelper特性并使用actorStream()如此示例中所示.后一个选项不显示与主题的连接,而是显示与套接字的连接.有人能指出我正确的方向吗?
为了处理 Kafka 故障,我使用了 Apache Curator 框架和以下解决方法:
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
Run Code Online (Sandbox Code Playgroud)
为了使用 Kafka 主题,我使用了该com.softwaremill.reactivekafka库。例如:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
285 次 |
| 最近记录: |