voi*_*oid 2 partitioning hadoop-yarn apache-spark spark-streaming spark-dataframe
我有一个火花流应用程序,看起来像这样:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
Run Code Online (Sandbox Code Playgroud)
而且,我使用
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
Run Code Online (Sandbox Code Playgroud)
当我尝试登录时kafkaDF.rdd.partitions.size,结果多数为'1'或'5'。我很困惑,是否可以控制DataFrame的分区数量?KafkaUtils.createStream似乎不接受与我想要的rdd分区数量有关的任何参数。我试过了kafkaDF.rdd.repartition( int ),但似乎也不起作用。
如何在代码中实现更多的并行性?如果我的方法是错误的,正确的方法是什么?
在Spark Streaming中,可以在两个方面实现并行:(a)使用者/接收者(在您的情况下为Kafka使用者),以及(b)处理(由Spark完成)。
默认情况下,Spark流将为每个使用者分配一个内核(即Thread)。因此,如果需要摄取更多数据,则需要创建更多使用者。每个使用者将创建一个DStream。然后,您可以联合DStreams以获得一个大型流。
// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)
Run Code Online (Sandbox Code Playgroud)
或者,可以通过重新划分输入流来增加接收者/消费者的数量:
inputStream.repartition(<number of partitions>))
Run Code Online (Sandbox Code Playgroud)
流应用程序可用的所有其余核心将分配给Spark。
因此,如果您拥有N核心(通过定义spark.cores.max),并且您拥有C使用方,那么您将N-C获得适用于Spark的核心。
#Partitions =~ #Consumers x (batch duration / block interval)
Run Code Online (Sandbox Code Playgroud)
块间隔 =消费者在推送作为火花块(定义为configuration spark.streaming.blockInterval)创建的数据之前等待的时间。
请始终记住,Spark Streaming具有两个经常发生的功能。一组读取当前微批处理的线程(消费者),以及一组处理前一个微批处理的线程(火花)。
| 归档时间: |
|
| 查看次数: |
1407 次 |
| 最近记录: |