标签: spark-streaming

在SPARK中实现Receiver

我一直在尝试为SPARK 0.9实现接收器.我使用Jnetpcap库捕获了数据包,需要将它传递给Scala中的spark.用"def receive()"方法编写数据包的捕获部分是否足够?

编辑:以下是此链接中使用Jnetpcap库捕获数据包的代码:

import java.util.Date
import java.lang.StringBuilder
import org.jnetpcap.Pcap
import org.jnetpcap.packet.PcapPacket
import org.jnetpcap.packet.PcapPacketHandler

object PacketCapture1 {
  def main(args: Array[String]){
    val snaplen = 64 * 1024 // Capture all packets, no trucation
    val flags = Pcap.MODE_PROMISCUOUS // capture all packets
    val timeout = 10 * 1000
    //val errbuf = new StringBuilder()

    val jsb = new java.lang.StringBuilder()
    val errbuf = new StringBuilder(jsb);
    val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf)
    if (pcap == null) {
      println("Error : " …
Run Code Online (Sandbox Code Playgroud)

scala packet-capture apache-spark spark-streaming

1
推荐指数
1
解决办法
1065
查看次数

没有ClassTag可用(Scala)

在Scala中我发生了一些奇怪的事情.我正在尝试使用第三方库

org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream

并获得No ClassTag异常.我模拟下面的场景,因为人们可以将Util视为第三方库.为什么会这样?

object Util {
    def fun1[K: ClassTag, M: ClassTag, KD: ClassTag, MD: ClassTag]: Unit = {
        println("In function version 2")
    }
}

class ClassTagIssue[K, M, KD, MD] {
    def build: Unit = {
        Util.fun1[K, M, KD, MD]
    }
}

object ClassTagIssue {
    def main(args: Array[String]) {
        new ClassTagIssue[String, String, String, String]().build
    }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试运行此代码并获得以下异常

Error:(23, 14) No ClassTag available for K
    Util.fun1[K, M, KD, MD]
             ^
Error:(23, 14) not enough arguments for method fun1: (implicit evidence$1: scala.reflect.ClassTag[K], …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1157
查看次数

1.5.1| 星火流 | 带有 SQL createDataFrame 的 NullPointerException

我正在使用 Spark 1.5.1。

在流中context我得到SQLContext如下

SQLContext sqlContext = SQLContext.getOrCreate(records.context()); DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class); dataFrame.registerTempTable("records");

record 是一个 JavaRDD 每个 Record 具有以下结构

public class SchemaRecord implements Serializable {

private static final long serialVersionUID = 1L; 
private String msisdn; 
private String application_type; 
//private long uplink_bytes = 0L;
}
Run Code Online (Sandbox Code Playgroud)

当 msisdn 和 application_type 等字段类型只是字符串时,一切正常。

当我添加另一个字段,如Uplink_bytes 是Long类型时,我在createDataFrame 处得到以下NullPointer Exception

Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming apache-spark-sql

1
推荐指数
1
解决办法
2844
查看次数

将管道分隔文件转换为Spark DataFrame转换为CSV文件

我有一个只有一栏的CSV文件,行定义如下:

123 || food || fruit
123 || food || fruit || orange 
123 || food || fruit || apple
Run Code Online (Sandbox Code Playgroud)

我想用一个单列和不同的行值创建一个csv文件:

orange
apple
Run Code Online (Sandbox Code Playgroud)

我尝试使用以下代码:

 val data = sc.textFile("fruits.csv")
 val rows = data.map(_.split("||"))
 val rddnew = rows.flatMap( arr => {
 val text = arr(0) 
 val words = text.split("||")
 words.map( word => ( word, text ) )
 } )
Run Code Online (Sandbox Code Playgroud)

但是这段代码并没有给我想要的正确结果。
有人可以帮我吗?

csv scala apache-spark spark-streaming rdd

1
推荐指数
1
解决办法
6031
查看次数

Spark Streaming 检查点引发 Not Serializable 异常

我们正在使用基于 Spark Streaming Receiver 的方法,并且我们刚刚启用了检查指向来摆脱数据丢失问题。

Spark 版本是1.6.1,我们正在接收来自 Kafka 主题的消息。

我在ssc里面使用,foreachRDD方法 of DStream,所以它抛出 Not Serializable 异常。

我尝试扩展 Serializable 类,但仍然出现相同的错误。它仅在我们启用检查点时发生。

代码是:

def main(args: Array[String]): Unit = {

    val checkPointLocation = "/path/to/wal"
    val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
    ssc.start()
    ssc.awaitTermination()
  }

    def createContext (checkPointLocation: String): StreamingContext ={

        val sparkConf = new SparkConf().setAppName("Test")
        sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(40))
        ssc.checkpoint(checkPointLocation)
        val sc = ssc.sparkContext
        val sqlContext: SQLContext = new HiveContext(sc)
        val kafkaParams = Map("group.id" …
Run Code Online (Sandbox Code Playgroud)

scala spark-streaming

1
推荐指数
1
解决办法
1621
查看次数

适用于1000多个主题的Spark Streaming设计

我必须使用以下用例设计一个Spark Streaming应用程序。我正在为此寻找最佳方法。

我有一个将数据推入1000多个不同主题的应用程序,每个主题都有不同的用途。Spark流式处理将从每个主题接收数据,并且在处理之后,它将回写到相应的另一个主题。

Ex. 

Input Type 1 Topic  --> Spark Streaming --> Output Type 1 Topic 
Input Type 2 Topic  --> Spark Streaming --> Output Type 2 Topic 
Input Type 3 Topic  --> Spark Streaming --> Output Type 3 Topic 
.
.
.
Input Type N Topic  --> Spark Streaming --> Output Type N Topic  and so on.
Run Code Online (Sandbox Code Playgroud)

我需要回答以下问题。

  1. 每个主题启动1000个以上的Spark Streaming应用程序是一个好主意吗?或者我应该为所有主题提供一个流应用程序,因为处理逻辑将是相同的?
  2. 如果是一个流上下文,那么我将如何确定哪个RDD属于哪个Kafka主题,以便在处理之后可以将其写回到其对应的OUTPUT主题?
  3. 客户端可以从Kafka添加/删除主题,如何在Spark流中动态处理?
  4. 如何在出现故障时自动重新启动作业?

你们还有其他问题吗?

非常感谢您的回应。

apache-spark spark-streaming spark-dataframe

1
推荐指数
1
解决办法
766
查看次数

如何在 Apache Spark 中删除以某个单词开头的多个 hdfs 目录

我使用dstream.saveAsObjectFiles("/temObj")它在 hdfs 中显示多个文件的方法在火花流中保留了对象文件。

temObj-1506338844000
temObj-1506338848000
temObj-1506338852000
temObj-1506338856000
temObj-1506338860000
Run Code Online (Sandbox Code Playgroud)

阅读完所有文件后,我想删除所有 temObj 文件。在火花中做到这一点的最佳方式是什么?我试过

val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
hdfs.delete(new org.apache.hadoop.fs.Path(Path), true) 
Run Code Online (Sandbox Code Playgroud)

但它一次只能删除一个文件夹

hadoop scala hdfs apache-spark spark-streaming

1
推荐指数
1
解决办法
4157
查看次数

为什么即使没有新数据可用,Spark Streaming 也会执行 foreachRDD?

我有以下 Spark 流示例:

val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
  println(file.count())
})

ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

即使文件夹为空,它也会每 2 秒打印 0,就像文件夹中存在空文件一样。我希望它只foreachRDD在文件夹中存在新文件时才进入。有什么我做错了吗?

我使用的是 Spark 1.6 和 Scala 2.10.7。

scala apache-spark spark-streaming

1
推荐指数
1
解决办法
674
查看次数

在 Spark Structured Streaming 中找不到“窗口”函数

我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。

这些是我的 build.sbt 的相关行:

scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= {

  val sparkVer = "2.3.0"
  Seq(
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided",
  )
}
Run Code Online (Sandbox Code Playgroud)

和代码:

case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)

def convertToNetEntry(x: String): NetEntry = {
    // tcp …
Run Code Online (Sandbox Code Playgroud)

spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
2295
查看次数

启用了Spark Streaming Checkpoint的java.io.NotSerializableException

我在火花流应用程序中启用了检查点,并且在作为依赖项下载的类上遇到此错误。

没有检查点,该应用程序运行良好。

错误:

com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
    - object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
    - field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
    - object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
    - field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
    - field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
    - …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

1
推荐指数
1
解决办法
307
查看次数