小编Hem*_*mil的帖子

如何仅处理文件中的新记录?

我有一个处理文件中的记录的场景。文件中的数据会定期(每毫秒)添加一次。所以我需要读取文件并处理它,同时仅处理新添加的记录。

我遇到了基于 Spark SQL 构建的 Spark 结构化流的概念。我正在做的是——

  1. 每1秒触发一次文件流处理
  2. 对文件运行 Spark SQL 查询
  3. 以附加模式在控制台上写入查询的输出。

下面是相同的代码 -

public static class SparkStreamer implements Runnable,Serializable {
    @Override
    public void run() {
        processDataStream();

    }

    private void processDataStream() {

        Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");

        Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterator<String> call(String row) throws Exception {
                return Arrays.asList(row.split("\\|")).iterator();
            }


        },Encoders.STRING());

        Dataset<Row> dataCount = data.select(new Column("value"));


        StreamingQuery query = dataCount.writeStream()
                  .outputMode("append")
                  .format("console")
                  .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            // TODO Auto-generated catch …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-structured-streaming

2
推荐指数
1
解决办法
1279
查看次数