Spark流式传输:如何向DStream中添加更多分区?

voi*_*oid 2 partitioning hadoop-yarn apache-spark spark-streaming spark-dataframe

我有一个火花流应用程序,看起来像这样:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )
Run Code Online (Sandbox Code Playgroud)

而且,我使用

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
Run Code Online (Sandbox Code Playgroud)

当我尝试登录时kafkaDF.rdd.partitions.size,结果多数为'1'或'5'。我很困惑,是否可以控制DataFrame的分区数量?KafkaUtils.createStream似乎不接受与我想要的rdd分区数量有关的任何参数。我试过了kafkaDF.rdd.repartition( int ),但似乎也不起作用。

如何在代码中实现更多的并行性?如果我的方法是错误的,正确的方法是什么?

mar*_*ios 5

在Spark Streaming中,可以在两个方面实现并行:(a)使用者/接收者(在您的情况下为Kafka使用者),以及(b)处理(由Spark完成)。

默认情况下,Spark流将为每个使用者分配一个内核(即Thread)。因此,如果需要摄取更多数据,则需要创建更多使用者。每个使用者将创建一个DStream。然后,您可以联合DStreams以获得一个大型流。

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)
Run Code Online (Sandbox Code Playgroud)

或者,可以通过重新划分输入流来增加接收者/消费者的数量:

inputStream.repartition(<number of partitions>))
Run Code Online (Sandbox Code Playgroud)

流应用程序可用的所有其余核心将分配给Spark。

因此,如果您拥有N核心(通过定义spark.cores.max),并且您拥有C使用方,那么您将N-C获得适用于Spark的核心。

#Partitions =~  #Consumers x (batch duration / block interval)
Run Code Online (Sandbox Code Playgroud)

块间​​隔 =消费者在推送作为火花块(定义为configuration spark.streaming.blockInterval)创建的数据之前等待的时间。

请始终记住,Spark Streaming具有两个经常发生的功能。一组读取当前微批处理的线程(消费者),以及一组处理前一个微批处理的线程(火花)。

有关性能调优的更多技巧,请参考此处此处此处