我一直在尝试为SPARK 0.9实现接收器.我使用Jnetpcap库捕获了数据包,需要将它传递给Scala中的spark.用"def receive()"方法编写数据包的捕获部分是否足够?
编辑:以下是此链接中使用Jnetpcap库捕获数据包的代码:
import java.util.Date
import java.lang.StringBuilder
import org.jnetpcap.Pcap
import org.jnetpcap.packet.PcapPacket
import org.jnetpcap.packet.PcapPacketHandler
object PacketCapture1 {
def main(args: Array[String]){
val snaplen = 64 * 1024 // Capture all packets, no trucation
val flags = Pcap.MODE_PROMISCUOUS // capture all packets
val timeout = 10 * 1000
//val errbuf = new StringBuilder()
val jsb = new java.lang.StringBuilder()
val errbuf = new StringBuilder(jsb);
val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf)
if (pcap == null) {
println("Error : " …Run Code Online (Sandbox Code Playgroud) 在Scala中我发生了一些奇怪的事情.我正在尝试使用第三方库
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream
并获得No ClassTag异常.我模拟下面的场景,因为人们可以将Util视为第三方库.为什么会这样?
object Util {
def fun1[K: ClassTag, M: ClassTag, KD: ClassTag, MD: ClassTag]: Unit = {
println("In function version 2")
}
}
class ClassTagIssue[K, M, KD, MD] {
def build: Unit = {
Util.fun1[K, M, KD, MD]
}
}
object ClassTagIssue {
def main(args: Array[String]) {
new ClassTagIssue[String, String, String, String]().build
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试运行此代码并获得以下异常
Error:(23, 14) No ClassTag available for K
Util.fun1[K, M, KD, MD]
^
Error:(23, 14) not enough arguments for method fun1: (implicit evidence$1: scala.reflect.ClassTag[K], …Run Code Online (Sandbox Code Playgroud) 我正在使用 Spark 1.5.1。
在流中context我得到SQLContext如下
SQLContext sqlContext = SQLContext.getOrCreate(records.context());
DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class);
dataFrame.registerTempTable("records");
record 是一个 JavaRDD 每个 Record 具有以下结构
public class SchemaRecord implements Serializable {
private static final long serialVersionUID = 1L;
private String msisdn;
private String application_type;
//private long uplink_bytes = 0L;
}
Run Code Online (Sandbox Code Playgroud)
当 msisdn 和 application_type 等字段类型只是字符串时,一切正常。
当我添加另一个字段,如Uplink_bytes 是Long类型时,我在createDataFrame 处得到以下NullPointer Exception
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at …Run Code Online (Sandbox Code Playgroud) 我有一个只有一栏的CSV文件,行定义如下:
123 || food || fruit
123 || food || fruit || orange
123 || food || fruit || apple
Run Code Online (Sandbox Code Playgroud)
我想用一个单列和不同的行值创建一个csv文件:
orange
apple
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下代码:
val data = sc.textFile("fruits.csv")
val rows = data.map(_.split("||"))
val rddnew = rows.flatMap( arr => {
val text = arr(0)
val words = text.split("||")
words.map( word => ( word, text ) )
} )
Run Code Online (Sandbox Code Playgroud)
但是这段代码并没有给我想要的正确结果。
有人可以帮我吗?
我们正在使用基于 Spark Streaming Receiver 的方法,并且我们刚刚启用了检查指向来摆脱数据丢失问题。
Spark 版本是1.6.1,我们正在接收来自 Kafka 主题的消息。
我在ssc里面使用,foreachRDD方法 of DStream,所以它抛出 Not Serializable 异常。
我尝试扩展 Serializable 类,但仍然出现相同的错误。它仅在我们启用检查点时发生。
def main(args: Array[String]): Unit = {
val checkPointLocation = "/path/to/wal"
val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
ssc.start()
ssc.awaitTermination()
}
def createContext (checkPointLocation: String): StreamingContext ={
val sparkConf = new SparkConf().setAppName("Test")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(40))
ssc.checkpoint(checkPointLocation)
val sc = ssc.sparkContext
val sqlContext: SQLContext = new HiveContext(sc)
val kafkaParams = Map("group.id" …Run Code Online (Sandbox Code Playgroud) 我必须使用以下用例设计一个Spark Streaming应用程序。我正在为此寻找最佳方法。
我有一个将数据推入1000多个不同主题的应用程序,每个主题都有不同的用途。Spark流式处理将从每个主题接收数据,并且在处理之后,它将回写到相应的另一个主题。
Ex.
Input Type 1 Topic --> Spark Streaming --> Output Type 1 Topic
Input Type 2 Topic --> Spark Streaming --> Output Type 2 Topic
Input Type 3 Topic --> Spark Streaming --> Output Type 3 Topic
.
.
.
Input Type N Topic --> Spark Streaming --> Output Type N Topic and so on.
Run Code Online (Sandbox Code Playgroud)
我需要回答以下问题。
你们还有其他问题吗?
非常感谢您的回应。
我使用dstream.saveAsObjectFiles("/temObj")它在 hdfs 中显示多个文件的方法在火花流中保留了对象文件。
temObj-1506338844000
temObj-1506338848000
temObj-1506338852000
temObj-1506338856000
temObj-1506338860000
Run Code Online (Sandbox Code Playgroud)
阅读完所有文件后,我想删除所有 temObj 文件。在火花中做到这一点的最佳方式是什么?我试过
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
hdfs.delete(new org.apache.hadoop.fs.Path(Path), true)
Run Code Online (Sandbox Code Playgroud)
但它一次只能删除一个文件夹
我有以下 Spark 流示例:
val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
println(file.count())
})
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
即使文件夹为空,它也会每 2 秒打印 0,就像文件夹中存在空文件一样。我希望它只foreachRDD在文件夹中存在新文件时才进入。有什么我做错了吗?
我使用的是 Spark 1.6 和 Scala 2.10.7。
我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。
这些是我的 build.sbt 的相关行:
scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"
libraryDependencies ++= {
val sparkVer = "2.3.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided",
)
}
Run Code Online (Sandbox Code Playgroud)
和代码:
case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)
def convertToNetEntry(x: String): NetEntry = {
// tcp …Run Code Online (Sandbox Code Playgroud) 我在火花流应用程序中启用了检查点,并且在作为依赖项下载的类上遇到此错误。
没有检查点,该应用程序运行良好。
错误:
com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
- object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
- field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
- object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
- field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
- field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
- …Run Code Online (Sandbox Code Playgroud)