use*_*859 1 scala packet-capture apache-spark spark-streaming
我一直在尝试为SPARK 0.9实现接收器.我使用Jnetpcap库捕获了数据包,需要将它传递给Scala中的spark.用"def receive()"方法编写数据包的捕获部分是否足够?
编辑:以下是此链接中使用Jnetpcap库捕获数据包的代码:
import java.util.Date
import java.lang.StringBuilder
import org.jnetpcap.Pcap
import org.jnetpcap.packet.PcapPacket
import org.jnetpcap.packet.PcapPacketHandler
object PacketCapture1 {
def main(args: Array[String]){
val snaplen = 64 * 1024 // Capture all packets, no trucation
val flags = Pcap.MODE_PROMISCUOUS // capture all packets
val timeout = 10 * 1000
//val errbuf = new StringBuilder()
val jsb = new java.lang.StringBuilder()
val errbuf = new StringBuilder(jsb);
val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf)
if (pcap == null) {
println("Error : " + errbuf.toString())
}
println(pcap)
val jpacketHandler = new PcapPacketHandler[String]() {
def nextPacket(packet: PcapPacket, user: String) {
println("Received packet at %s caplen=%4d len=%4d %s\n", new Date(packet.getCaptureHeader.timestampInMillis()),
packet.getCaptureHeader.caplen(), packet.getCaptureHeader.wirelen(), user)
}
}
pcap.loop(30, jpacketHandler, "jNetPcap works!")
pcap.close()
}
}
Run Code Online (Sandbox Code Playgroud)
如何使用此代码捕获的数据包实现spark接收器?
您必须创建自定义NetworkReceiver(或Spark 1.0+中的Receiver)并实现该onStart()方法.对于Spark 0.9,请参阅http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html
对于spark 1.0(强烈推荐),请参阅http://spark.apache.org/docs/latest/streaming-custom-receivers.html
| 归档时间: |
|
| 查看次数: |
1065 次 |
| 最近记录: |