并行运行不同DStream的多个Spark Streaming作业

Adi*_*ber 8 apache-spark spark-streaming

我有一个Spark Streaming应用程序,它从多个Kafka主题中读取数据.每个主题都有不同类型的数据,因此需要不同的处理管道.

我最初的解决方案是为每个主题创建一个DStream:

def main(args: Array[String]) { 
    val streamingContext: StreamingContext = ...
    val topics = ...

    for (topic <- topics) {
        val offsets: Map[TopicAndPartition, Long] = ...
        val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
        configureStream(topic, stream)
    }

    streamingContext.addStreamingListener(new StreamingListener {
        override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
            // logic to save offsets after each batch completes
        }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
}


def configureStream(topic: String, stream: DStream[...]) {
    topic match {
        case "first" => stream.map(...).foreachRDD(...)
        case "second" => stream.map(...).foreachRDD(...)
        case "third" => stream.map(...).foreachRDD(...)
        // ...
    }
}
Run Code Online (Sandbox Code Playgroud)

在运行应用程序时,处理作业会一个接一个地计算,即使它们最初属于不同的DStream.

我尝试调整spark.streaming.concurrentJobs参数(如此处所述),但是当事情变得奇怪时:

  • 第一批处理更多数据(因为数据在流应用程序关闭时在Kafka中累积).处理时间比指定的批处理间隔长.
  • 第二批添加到队列中(第一批仍在运行),并立即开始处理.
  • 第二批(有时甚至是第三批)批次在第一批之前完成.

这可能会导致问题,例如在管理Kafka偏移时 - 流式监听器首先获得第二个/第三个批次的偏移(因为它首先完成)并保存它们.如果应用程序在完成第一批之前崩溃,则该数据将丢失.在另一种情况下,如果第一批完成并且应用程序之后崩溃,则重播第二批/第三批的数据.

有没有办法告诉Spark并行处理作业而不处理新批次?或者,也许,并行处理不同的DStream(即,一个DStream中的作业是线性处理的;并行地跨越不同的DStream)?

bes*_*hes 0

Dstream 无法做到这一点。

Spark 结构化流解决了这个问题。

您可以查看答案以获取更多信息吗?