如何使用收到记录的年份、月份和日期将流写入 S3?

djW*_*ann 3 scala apache-spark spark-structured-streaming

我有一个简单的流,可以从 Kafka 主题中读取一些数据:

 val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1")
      .option("subscribe", "topic1")
      .option("startingOffsets", "earliest")
      .load()

val df = ds.selectExpr("cast (value as string) as json")
      .select(from_json($"json", schema).as("data"))
      .select("data.*")
Run Code Online (Sandbox Code Playgroud)

我想根据收到的日期将此数据存储在 S3 中,例如:

s3_bucket/year/month/day/data.json
Run Code Online (Sandbox Code Playgroud)

当我想写数据时,我会这样做:

df.writeStream
  .format("json")
  .outputMode("append")
  .option("path", s3_path)
  .start()
Run Code Online (Sandbox Code Playgroud)

但是如果我这样做,我只能指定一个路径。有没有办法根据日期动态更改 s3 路径?

小智 6

使用partitionBy条款:

import org.apache.spark.sql.functions._

df.select(
    dayofmonth(current_date()) as "day",
    month(current_date()) as "month",
    year(current_date()) as "year",
    $"*")
  .writeStream
  .partitionBy("year", "month", "day")
  ... // all other options
Run Code Online (Sandbox Code Playgroud)