我正在编写一个测试应用程序,它消耗来自Kafka的topcis的消息,然后将数据推送到S3并进入RDBMS表(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data -in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html).所以我从Kafka读取数据然后:
所以我有点像:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
Run Code Online (Sandbox Code Playgroud)
(请注意我正在阅读多个Kafka主题).接下来我定义所需的数据集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
Run Code Online (Sandbox Code Playgroud)
现在为每个数据集我创建查询以开始处理:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table …Run Code Online (Sandbox Code Playgroud)