我有一个Spark Streaming应用程序,它从多个Kafka主题中读取数据.每个主题都有不同类型的数据,因此需要不同的处理管道.
我最初的解决方案是为每个主题创建一个DStream:
def main(args: Array[String]) {
val streamingContext: StreamingContext = ...
val topics = ...
for (topic <- topics) {
val offsets: Map[TopicAndPartition, Long] = ...
val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
configureStream(topic, stream)
}
streamingContext.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
// logic to save offsets after each batch completes
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
def configureStream(topic: String, stream: DStream[...]) {
topic match {
case "first" => stream.map(...).foreachRDD(...)
case "second" => stream.map(...).foreachRDD(...)
case "third" => stream.map(...).foreachRDD(...) …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个 Spark Structured Streaming 作业,该作业从多个 Kafka 主题(可能是 100 个)读取并根据主题名称将结果写入 S3 上的不同位置。我开发了这段代码,它当前从多个主题中读取并将结果输出到控制台(基于循环),它按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个 readStream 和 writeStream 操作吗?如果是这样,推荐的方法是什么?
my_topics = ["topic_1", "topic_2"]
for i in my_topics:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribePattern", i) \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
output_df = df \
.writeStream \
.format("console") \
.option("truncate", False) \
.outputMode("update") \
.option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
.start()
Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark pyspark spark-structured-streaming
我必须使用以下用例设计一个Spark Streaming应用程序。我正在为此寻找最佳方法。
我有一个将数据推入1000多个不同主题的应用程序,每个主题都有不同的用途。Spark流式处理将从每个主题接收数据,并且在处理之后,它将回写到相应的另一个主题。
Ex.
Input Type 1 Topic --> Spark Streaming --> Output Type 1 Topic
Input Type 2 Topic --> Spark Streaming --> Output Type 2 Topic
Input Type 3 Topic --> Spark Streaming --> Output Type 3 Topic
.
.
.
Input Type N Topic --> Spark Streaming --> Output Type N Topic and so on.
Run Code Online (Sandbox Code Playgroud)
我需要回答以下问题。
你们还有其他问题吗?
非常感谢您的回应。