dej*_*jan 7 apache-spark spark-structured-streaming
下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片.代码在附加模式下仅生成空的批处理0.在更新模式下,结果会正确显示.需要附加模式,因为S3接收器仅在附加模式下工作.
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
以下是完整的测试代码:
public static void main(String [] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
ArrayList<String> rl = new ArrayList<>();
for (int i = 0; i < 200; ++i) {
long t = 1512164314L + i * 5 * 60;
rl.add(t + ",qwer");
}
String nameCol = "name";
String eventTimeCol = "eventTime";
String eventTimestampCol = "eventTimestamp";
MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
"cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
"cast(split(value,'[,]')[1] as String) as " + nameCol);
System.out.println("isStreaming: " + stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
此错误已在 2.4.0 中解决,请参阅: https: //issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156
| 归档时间: |
|
| 查看次数: |
349 次 |
| 最近记录: |