标签: spark-skinning

在读取第一批数据后停止火花流动

我正在使用spark streaming来消费kafka消息.我想从kafka获取一些消息作为样本,而不是阅读所有消息.所以我想阅读一批消息,将它们返回给调用者并停止火花流.目前我在awaitTermination方法的spark流上下文方法中传递batchInterval时间.我现在不知道如何将处理过的数据从spark流返回给调用者.这是我目前正在使用的代码

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
    if (params.contains("zookeeperQourum"))
      zkQuorum = params.get("zookeeperQourum").get
    if (params.contains("userGroup"))
      group = params.get("userGroup").get
    if (params.contains("topics"))
      topics = params.get("topics").get
    if (params.contains("numberOfThreads"))
      numThreads = params.get("numberOfThreads").get
    if (params.contains("sink"))
      sink = params.get("sink").get
    if (params.contains("batchInterval"))
      interval = params.get("batchInterval").get.toInt
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
    val ssc = new StreamingContext(sparkConf, Seconds(interval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    var consumerConfig = scala.collection.immutable.Map.empty[String, String]
    consumerConfig += ("auto.offset.reset" -> "smallest")
    consumerConfig += ("zookeeper.connect" -> zkQuorum)
    consumerConfig += ("group.id" -> group)
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], …
Run Code Online (Sandbox Code Playgroud)

spark-skinning apache-kafka

4
推荐指数
1
解决办法
2598
查看次数

标签 统计

apache-kafka ×1

spark-skinning ×1