如何使Spark Streaming计算单元测试中文件中的单词?

Emr*_*inç 5 java unit-testing apache-spark spark-streaming

我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中HdfsCount示例.

当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数.我按Ctrl + C终止应用程序.

现在我已经尝试为这个功能创建一个非常基本的单元测试,但在测试中我无法打印相同的信息,即单词的数量.

我错过了什么?

下面是单元测试文件,之后我还包含了显示countWords方法的代码片段:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}
Run Code Online (Sandbox Code Playgroud)

这个测试编译并开始运行,Spark Streaming在控制台上打印了很多诊断消息,但调用wordCounts.print()不打印任何东西,而在StarterApp.java本身,它们确实如此.

我也尝试过添加ssc.awaitTermination();,ssc.start()但在这方面没有任何改变.之后我还试图在这个Spark Streaming应用程序正在检查的目录中手动创建一个新文件,但这次它给出了一个错误.

为完整起见,下面是wordCounts方法:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }
Run Code Online (Sandbox Code Playgroud)

maa*_*asg 2

几点提示:

  • 为 SparkStreaming 上下文提供至少 2 个核心。1 用于流处理,1 用于 Spark 处理。“本地”->“本地[2]”
  • 您的流传输间隔为 3000 毫秒,因此在程序中的某个位置您需要至少等待该时间才能获得输出。
  • Spark Streaming 需要一些时间来设置侦听器。ssc.start该文件在发布后立即创建。不保证文件系统监听器已经就位。sleep(xx)之后我会做一些ssc.start

在流媒体中,一切都取决于正确的时机。