Spark Scala UDP 在侦听端口上接收

use*_*581 3 sockets udp scala apache-spark spark-streaming

http://spark.apache.org/docs/latest/streaming-programming-guide.html 中提到的示例 让我在TCP流中接收数据包并侦听端口 9999

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
Run Code Online (Sandbox Code Playgroud)

我可以通过在我的 Linux 系统中使用创建数据服务器来通过 TCP 发送数据 $ nc -lk 9999

问题
我需要从使用UDP的 android 手机流接收流,而 Scala/Spark
val lines = ssc.socketTextStream("localhost", 9999)
仅在 TCP 流中接收。

如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream。

Yuv*_*kov 5

没有内置的东西,但你自己完成它并没有太多的工作。这是我基于自定义制作的简单解决方案UdpSocketInputDStream[T]

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
                                          _ssc: StreamingContext,
                                          host: String,
                                          port: Int,
                                          bytesToObjects: InputStream => Iterator[T],
                                          storageLevel: StorageLevel
                                        ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

class UdpSocketReceiver[T: ClassTag](host: String,
                                     port: Int,
                                     bytesToObjects: InputStream => Iterator[T],
                                     storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  var udpSocket: DatagramSocket = _

  override def onStart(): Unit = {

    try {
      udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $port", e)
        return
    }

    // Start the thread that receives data over a connection
    new Thread("Udp Socket Receiver") {
      setDaemon(true)

      override def run() {
        receive()
      }
    }.start()
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val buffer = new Array[Byte](2048)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)

      udpSocket.receive(packet)

      val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {
        store(iterator.next())
      }

      if (!isStopped()) {
        restart("Udp socket data stream had no more data")
      }
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }

  override def onStop(): Unit = {
    synchronized {
      if (udpSocket != null) {
        udpSocket.close()
        udpSocket = null
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

为了给自己StreamingContext添加一个方法,我们用一个隐式类来丰富它:

object Implicits {
  implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
    def udpSocketStream[T: ClassTag](host: String,
                                     port: Int,
                                     converter: InputStream => Iterator[T],
                                     storageLevel: StorageLevel): InputDStream[T] = {
      new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这就是你所说的一切:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
  import Implicits._

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext("local[*]", "udpTest")
    val ssc = new StreamingContext(sparkContext, Seconds(4))

    val stream = ssc.udpSocketStream("localhost", 
                                     3003, 
                                     bytesToLines, 
                                     StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(
      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
    new NextIterator[String] {
      protected override def getNext(): String = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true
        }
        nextValue
      }

      protected override def close() {
        dataInputStream.close()
      }
    }
  }

  abstract class NextIterator[U] extends Iterator[U] {
    protected var finished = false
    private var gotNext = false
    private var nextValue: U = _
    private var closed = false

    override def next(): U = {
      if (!hasNext) {
        throw new NoSuchElementException("End of stream")
      }
      gotNext = false
      nextValue
    }

    override def hasNext: Boolean = {
      if (!finished) {
        if (!gotNext) {
          nextValue = getNext()
          if (finished) {
            closeIfNeeded()
          }
          gotNext = true
        }
      }
      !finished
    }

    def closeIfNeeded() {
      if (!closed) {
        closed = true
        close()
      }
    }

    protected def getNext(): U
    protected def close()
  }
}
Run Code Online (Sandbox Code Playgroud)

这段代码大部分取自SocketInputDStream[T]Spark 提供的,我只是简单地重新使用了它。我还NextIterator使用bytesToLinesString. 如果你有更复杂的逻辑,你可以通过传递converter: InputStream => Iterator[T]你自己的实现来提供它。

使用简单的 UDP 数据包对其进行测试:

echo -n "hello hello hello!" >/dev/udp/localhost/3003
Run Code Online (Sandbox Code Playgroud)

产量:

-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!
Run Code Online (Sandbox Code Playgroud)

当然,这还需要进一步检验。我还有一个隐藏的假设,即每个bufferDatagramPacket2048 字节创建的,这可能是您想要更改的内容。