Apache Spark流媒体简单应用程序无法正常工

Luc*_*cio 3 apache-spark spark-streaming

我在Apache Spark Streaming库上遇到以下问题.我已经重写了一个简单的"字数统计"独立应用程序来查看流式传输的工作原理,所以这里的代码:

    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));

    // Create a DStream that will connect to hostname:port, like localhost:9999
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

    // Split each line into words
    JavaDStream<String> words = lines.flatMap(
      new FlatMapFunction<String, String>() {
        @Override public Iterable<String> call(String x) {
          return Arrays.asList(x.split(" "));
        }
      });

    // Count each word in each batch
    JavaPairDStream<String, Integer> pairs = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override public Tuple2<String, Integer> call(String s) throws Exception {
          return new Tuple2<String, Integer>(s, 1);
        }
      });



    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override public Integer call(Integer i1, Integer i2) throws Exception {
          return i1 + i2;
        }
      });

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print();

    jssc.start();              // Start the computation
    jssc.awaitTermination();   // Wait for the computation to terminate
Run Code Online (Sandbox Code Playgroud)

当我运行这个独立的应用程序时,日志循环以下行:

14/10/08 13:16:44 INFO JobScheduler: Finished job streaming job 1412767004000 ms.0 from job set of time 1412767004000 ms
14/10/08 13:16:44 INFO JobScheduler: Total delay: 0.023 s for time 1412767004000 ms (execution: 0.019 s)
14/10/08 13:16:44 INFO ShuffledRDD: Removing RDD 428 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 428
14/10/08 13:16:44 INFO MappedRDD: Removing RDD 427 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 427
14/10/08 13:16:44 INFO FlatMappedRDD: Removing RDD 426 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 426
14/10/08 13:16:44 INFO BlockRDD: Removing RDD 425 from persistence list
14/10/08 13:16:44 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[425] at BlockRDD at ReceiverInputDStream.scala:69 of time 1412767004000 ms
14/10/08 13:16:44 INFO BlockManager: Removing RDD 425
14/10/08 13:16:44 INFO SocketReceiver: Stopped receiving
14/10/08 13:16:44 INFO SocketReceiver: Closed socket to localhost:9999
14/10/08 13:16:44 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999: 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Called receiver onStop
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 13:16:44 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/10/08 13:16:45 INFO ReceiverTracker: Stream 0 received 0 blocks
14/10/08 13:16:45 INFO JobScheduler: Added jobs for time 1412767005000 ms
14/10/08 13:16:45 INFO JobScheduler: Starting job streaming job 1412767005000 ms.0 from job set of time 1412767005000 ms
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO DAGScheduler: Registering RDD 435 (map at MappedDStream.scala:35)
14/10/08 13:16:45 INFO DAGScheduler: Got job 217 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 433(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 434)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=23776, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_217 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 433.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 433.0 (TID 217, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 433.0 (TID 217)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 433.0 (TID 217). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 433.0 (TID 217) in 4 ms on localhost (1/1)
14/10/08 13:16:45 INFO DAGScheduler: Stage 433 (take at DStream.scala:608) finished in 0.006 s
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 433.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.009386933 s
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 108 is 82 bytes
14/10/08 13:16:45 INFO DAGScheduler: Got job 218 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 435(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 436)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=26032, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_218 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 435.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 435.0 (TID 218, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 435.0 (TID 218)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 435.0 (TID 218). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 435.0 (TID 218) in 3 ms on localhost (1/1)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 435.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO DAGScheduler: Stage 435 (take at DStream.scala:608) finished in 0.003 s
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.008348754 s
-------------------------------------------
Time: 1412767005000 ms
-------------------------------------------
Run Code Online (Sandbox Code Playgroud)

在Web UI上我可以看到以下截图:

在此输入图像描述

显然,当我写一些示例单词时,netcat -lk 9999没有做任何事情.

有人可以帮我弄清楚这个例子是如何工作的吗?

谢谢

mit*_*hra 8

正如评论中所给出的那样

nc -lk 9999 in console 
Run Code Online (Sandbox Code Playgroud)

然后在spark文件夹中运行以下命令

bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
Run Code Online (Sandbox Code Playgroud)

现在在运行nc的控制台的选项卡中添加单词

It is working! Life is beautiful!
Run Code Online (Sandbox Code Playgroud)

并检查spark文件夹中的输出

(beautiful!,1)
(working!,1)
(is,2)
(It,1)
(Life,1)
Run Code Online (Sandbox Code Playgroud)

如果你继续添加,它的程序将继续巩固..希望这会有所帮助