Spark (Scala) 结构化流聚合和自连接

red*_*dsk 5 inner-join apache-spark spark-structured-streaming

我正在尝试执行聚合,然后在 Structured Streaming 上执行自连接DataFrame。假设 df 如下所示:

sourceDf.show(false)
+-----+-------+
|owner|fruits |
+-----+-------+
|Brian|apple  |
|Brian|pear   |
|Brian|melon  |
|Brian|avocado|
|Bob  |avocado|
|Bob  |apple  |
+-----+-------+
Run Code Online (Sandbox Code Playgroud)

在 static 上DataFrame,这很容易:

val aggDf = sourceDf.groupBy($"owner").agg(collect_list(col("fruits")) as "fruitsA")
sourceDf.join(aggDf, Seq("owner")).show(false)
+-----+-------+-----------------------------+
|owner|fruits |fruitsA                      |
+-----+-------+-----------------------------+
|Brian|apple  |[apple, pear, melon, avocado]|
|Brian|pear   |[apple, pear, melon, avocado]|
|Brian|melon  |[apple, pear, melon, avocado]|
|Brian|avocado|[apple, pear, melon, avocado]|
|Bob  |avocado|[avocado, apple]             |
|Bob  |apple  |[avocado, apple]             |
+-----+-------+-----------------------------+
Run Code Online (Sandbox Code Playgroud)

不幸的是,我无法弄清楚如何在 Streaming 的情况下执行此操作DataFrame。因此,我尝试使用以下完整代码,该代码将 Kafka 用于源和接收器:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}


object Test {

  val spark: SparkSession = SparkSession.builder().getOrCreate()
  import spark.implicits._

  val brokers = "kafka:9092"

  val inputTopic = "test.kafka.sink.input"
  val aggTopic = "test.kafka.sink.agg"
  val outputTopicSelf = "test.kafka.sink.output.self"
  val outputTopicTwo = "test.kafka.sink.output.two"

  val payloadSchema: StructType = new StructType()
    .add("owner", StringType)
    .add("fruits", StringType)

  val payloadSchemaA: StructType = new StructType()
    .add("owner", StringType)
    .add("fruitsA", StringType)

  var joinedDfSchema: StructType = _

  val sourceDf: DataFrame = Seq(
    ("Brian", "apple"),
    ("Brian", "pear"),
    ("Brian", "melon"),
    ("Brian", "avocado"),
    ("Bob", "avocado"),
    ("Bob", "apple")
  )
    .toDF("owner", "fruits")

  val additionalData: DataFrame = Seq(("Bob", "grapes")).toDF("owner", "fruits")

  def saveDfToKafka(df: DataFrame): Unit = {
    df
      .select(to_json(struct(df.columns.map(column): _*)).alias("value"))
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("topic", inputTopic)
      .save()
  }

  // save data to kafka (batch)
  saveDfToKafka(sourceDf)

  // kafka source
  val farmDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("startingOffsets", "earliest")
    .option("subscribe", inputTopic)
    .load()
    .byteArrayToString("value")
    .withColumn("value", from_json($"value", payloadSchema))
    .expand("value")

  farmDF.printSchema()

  implicit class DFHelper(df: DataFrame) {
    def expand(column: String): DataFrame = {
      val wantedColumns = df.columns.filter(_ != column) :+ s"$column.*"
      df.select(wantedColumns.map(col): _*)
    }

    def byteArrayToString(column: String): DataFrame = {
      val selectedCols = df.columns.filter(_ != column) :+ s"CAST($column AS STRING)"
      df.selectExpr(selectedCols: _*)
    }
  }

  def testSelfAggJoinFail(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .groupBy($"owner")
      .agg(collect_list(col("fruits")) as "fruitsA")

    // joined df
    val joinedDF = farmDF
      .join(myFarmDF.as("myFarmDF"), Seq("owner"))
      .select("owner", "fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testSelfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        window($"timestamp", "30 seconds", "15 seconds"),
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA", "window")

    // joined df
    val joinedDF = farmDF
        .as("farmDF")
      .withWatermark("timestamp", "30 seconds")
      .join(
        myFarmDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.window.start AND
            |farmDF.timestamp <= myFarmDF.window.end
          """.stripMargin))
      .select("farmDF.owner", "farmDF.fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testTwoDfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA")

    // save the aggregated df to kafka
    myFarmDF
      .select(to_json(struct(myFarmDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("update")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointAgg")
      .option("topic", aggTopic)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)

    // read the aggregated df from kafka as a stream
    val aggDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("startingOffsets", "earliest")
      .option("subscribe", aggTopic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", payloadSchemaA))
      .expand("value")
      .withWatermark("timestamp", "30 seconds")

    // joined df
    val joinedDF = farmDF
      .as("farmDF")
      .join(
        aggDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.timestamp - interval 1 hour AND
            |farmDF.timestamp <= myFarmDF.timestamp + interval 1 hour
          """.stripMargin))
      .select("farmDF.owner", "myFarmDF.fruitsA", "farmDF.fruits")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointTwo")
      .option("topic", outputTopicTwo)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def data(topic: String): DataFrame = {
    // let's read back the output topic using kafka batch
    spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", joinedDfSchema))
      .expand("value")
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,如果我在 Streaming 上进行测试DataFrame

scala> Test.testSelfAggJoinFail
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Project [structstojson(named_struct(owner, owner#59, fruits, fruits#60, fruitsA, fruitsA#78), Some(Etc/UTC)) AS value#96]
+- Project [owner#59, fruits#60, fruitsA#78]
   +- Project [owner#59, key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, fruits#60, fruitsA#78]
      +- Join Inner, (owner#59 = owner#82)
         :- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#59, value#51.fruits AS fruits#60]
         :  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
         :     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
         :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]
         +- SubqueryAlias myFarmDF
            +- Aggregate [owner#82], [owner#82, collect_list(fruits#83, 0, 0) AS fruitsA#78]
               +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#82, value#51.fruits AS fruits#83]
                  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
                     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:110)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:235)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
  at Test$.testSelfAggJoinFail(<console>:123)
  ... 51 elided
Run Code Online (Sandbox Code Playgroud)

它失败了,Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark因为我不使用水印。

现在,如果我可以运行第二个测试

Test.testSelfAggJoin
Run Code Online (Sandbox Code Playgroud)

我收到这些警告

2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end
Run Code Online (Sandbox Code Playgroud)

我可以检查结果

Test.data(Test.outputTopicSelf).show(false)
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 2 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 6 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
+---+-----+---------+------+---------+-------------+-----+------+-------+
|key|topic|partition|offset|timestamp|timestampType|owner|fruits|fruitsA|
+---+-----+---------+------+---------+-------------+-----+------+-------+
+---+-----+---------+------+---------+-------------+-----+------+-------+
Run Code Online (Sandbox Code Playgroud)

它返回一个空DataFrame(可能是因为警告?)。我无法找到自连接的解决方案。

最后,我尝试将聚合下沉到 Kafka 并将其重新读取为第二个 Streaming DataFrame,如下所示

scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
Run Code Online (Sandbox Code Playgroud)

这是有效的(虽然不是很有效,我想说),但是如果我向源主题添加额外的数据:

scala> Test.saveDfToKafka(Test.additionalData)
scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
|null|test.kafka.sink.output.two|0        |6     |2018-09-12 16:59:37.125|0            |Bob  |["apple","avocado"]               |grapes |
|null|test.kafka.sink.output.two|0        |7     |2018-09-12 16:59:40.001|0            |Bob  |["apple","avocado","grapes"]      |apple  |
|null|test.kafka.sink.output.two|0        |8     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |avocado|
|null|test.kafka.sink.output.two|0        |9     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |grapes |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
Run Code Online (Sandbox Code Playgroud)

我得到了更多行,可能是因为我必须.outputMode("update")在下沉聚合 Df 时使用。

  • 有没有办法执行此聚合而不将聚合作为单独的主题发送回 Kafka?
  • 如果没有的话是否可以修改testTwoDfAggJoin使用.outputMode("append")

小智 0

我遇到了类似的错误信息,outputMode这很重要agg,我通过添加df.writeStream.outputMode(OutputMode.Update())或解决df.writeStream.outputMode(OutputMode.Complete())

参考:

输出模式 输出模式有几种类型。

追加模式(默认) - 这是默认模式,只有自上次触发以来添加到结果表的新行才会输出到接收器。仅那些添加到结果表中的行永远不会更改的查询才支持此功能。因此,该模式保证每行仅输出一次(假设容错接收器)。例如,仅包含 select、where、map、flatMap、filter、join 等的查询将支持 Append 模式。

完整模式 - 每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。

更新模式 -(自 Spark 2.1.1 起可用)只有结果表中自上次触发以来更新的行才会输出到接收器。更多信息将在未来版本中添加。

http://blog.madhukaraphatak.com/introduction-to-spark-structed-streaming-part-3/