了解Spark结构化流式并行

Kle*_*ios 6 apache-spark apache-spark-sql spark-structured-streaming

我是Spark世界中的新手,正在努力解决一些概念。

使用Kafka的Spark结构化流式采购时,并行性如何发生?

让我们考虑以下代码段代码:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();
Run Code Online (Sandbox Code Playgroud)

我已经读到并行性与数据分区的数量有关,而数据集的分区数量是基于spark.sql.shuffle.partitions参数的。

  1. 对于每一批(从Kafka中抽出),提取的物品是否会在spark.sql.shuffle.partitions?例如,spark.sql.shuffle.partitions=5Batch1=100行,我们将最终得到5个分区,每个分区20行吗?

  2. 考虑到提供的代码段代码,由于groupByKey后面还有一个mapGroups/mapGroupsWithState函数,我们是否仍在利用Spark并行性?

更新:

在里面gDataset.mapGroupsWithState,我处理每个键/值并将结果存储在HDFS中。因此,输出接收器仅用于在控制台中输出某些统计信息。

Yuv*_*kov 7

对于每批(从Kafka中拉出),拉出的物品是否会在spark.sql.shuffle.partitions

一旦达到groupByKey随机混合边界,它们将被分割。最初检索数据时,分区数将等于Kafka分区数

考虑到提供的代码段,由于groupByKey和mapGroups / mapGroupsWithState函数,我们是否仍在利用Spark并行性

通常是,但是这还取决于您如何设置Kafka主题。尽管从代码中看不到,但Spark会在内部将数据分成不同的阶段,分成较小的任务,并将它们分配给集群中的可用执行程序。如果您的Kafka主题只有1个分区,则意味着在之前groupByKey,您的内部流将包含一个分区,该分区不会被并行化,而是在单个执行程序上执行。只要您的Kafka分区数大于1,您的处理就将并行进行。在shuffle边界之后,Spark将对数据进行重新分区以包含所指定的分区数量spark.sql.shuffle.partitions