Hem*_*mil 2 apache-spark apache-spark-sql spark-structured-streaming
我有一个处理文件中的记录的场景。文件中的数据会定期(每毫秒)添加一次。所以我需要读取文件并处理它,同时仅处理新添加的记录。
我遇到了基于 Spark SQL 构建的 Spark 结构化流的概念。我正在做的是——
下面是相同的代码 -
public static class SparkStreamer implements Runnable,Serializable {
@Override
public void run() {
processDataStream();
}
private void processDataStream() {
Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");
Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String row) throws Exception {
return Arrays.asList(row.split("\\|")).iterator();
}
},Encoders.STRING());
Dataset<Row> dataCount = data.select(new Column("value"));
StreamingQuery query = dataCount.writeStream()
.outputMode("append")
.format("console")
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
通过上述实现,查询执行了 1 次,但如果我在文件中添加新记录,则不会触发第二批执行。
其他观察结果:
有人可以帮助解决这个问题吗?Spark Structured Streaming 是否支持处理文件中的数据,因为普通 Spark Streaming 不支持。
Spark Structured Streaming是否支持处理文件中的数据
是的。
查询执行了 1 次,但如果我在文件中添加新记录,则不会触发第二批执行。
一旦文件被标记为已看到并且不再被处理,那么在处理文件后就不会再工作了(查看负责它的FileStreamSource以了解它在幕后的工作原理)。
推荐的解决方案是将新内容写入新文件。
| 归档时间: |
|
| 查看次数: |
1279 次 |
| 最近记录: |