在 Spark Structured Streaming 中指定“basePath”选项

lbz*_*lbz 6 java apache-spark spark-streaming

basePath在 Spark Structured Streaming (in Java) 中读取分区数据时是否可以设置该选项?我只想加载特定分区中的数据,例如basepath/x=1/,但我也想x作为列加载。设置basePath非流式数据帧的方式似乎不起作用。

这是一个最小的例子。我有一个包含以下数据的数据框:

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我将此作为 Parquet 文件写入名为x=1.

以下代码(带有常规的非流数据帧)工作正常:

Dataset<Row> data = sparkSession.read()
  .option("basePath", basePath)
  .parquet(basePath + "/x=1");

data.show();
Run Code Online (Sandbox Code Playgroud)

这产生了预期的结果:

+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  2|  1|
|  3|  4|  1|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

但是,以下(使用 Structured Streaming API)不起作用:

StructType schema = data.schema(); // data as defined above

Dataset<Row> streamingData = sparkSession.readStream()
  .schema(schema)
  .option("basePath", basePath)
  .parquet(basePath + "/x=1");

streamingData.writeStream()
  .trigger(Trigger.Once())
  .format("console")
  .start().awaitTermination();
Run Code Online (Sandbox Code Playgroud)

在这种情况下,数据框不包含任何行:

+---+---+---+
|  a|  b|  x|
+---+---+---+
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

Med*_*rik 1

我不确定这是否适用于 Spark 流,但它适用于我在 Scala 中的批处理。我会做的是我会basePath完全避免使用。例如,当我的数据按年/月/日进行分区,并且我想每天循环和处理时,我会使用字符串插值。

import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.util.Calendar

var dateStart: String = "01/14/2012"
var dateStop: String = "01/18/2012"

var  format: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy");


var d1 = new Timestamp(format.parse(dateStart).getTime());
var d2 = new Timestamp(format.parse(dateStop).getTime());

var diffDays:Long = (d2.getTime() - d1.getTime()) / (24 * 60 * 60 * 1000)

var cal:Calendar = Calendar.getInstance()
cal.setTimeInMillis(d1.getTime())
for (i <- 0 to diffDays.toInt){
    val year = cal.get(Calendar.YEAR)
    val month = cal.get(Calendar.MONTH)
    val day = cal.get(Calendar.DAY_OF_MONTH)
    var dataframe1 = spark.read
           .load(s"s3://bucketName/somepath/year=$year/month=$month/day=$day")
    /*
    Do your dataframe manipulation here
    */
    cal.add(Calendar.DAY_OF_YEAR, 1)
}
Run Code Online (Sandbox Code Playgroud)

您也可以使用字符串或整数列表来执行此操作。如果您需要将该数据视为一列,则始终可以将其作为新列附加到数据框。我不确定这是否适用于您的 Spark 流媒体案例。