小编Rav*_*mar的帖子

线程“主”中的异常java.lang.NoClassDefFoundError:org / apache / spark / streaming / StreamingContext

大家好,在下面的代码中找不到类StreamingContext。

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Exemple {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Exemple")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2)) //this line throws error

  }
}
Run Code Online (Sandbox Code Playgroud)

这是错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/StreamingContext
    at Exemple$.main(Exemple.scala:16)
    at Exemple.main(Exemple.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.StreamingContext
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 2 more

Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

我使用以下build.sbt文件:

name := "exemple"

version := "1.0.0"

scalaVersion := "2.11.11" …
Run Code Online (Sandbox Code Playgroud)

scala intellij-idea sbt apache-spark spark-streaming

2
推荐指数
1
解决办法
2001
查看次数

2 个具有相同消费者组 ID 的 Spark Stream 作业

我正在尝试对消费者群体进行实验

这是我的代码片段

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming

2
推荐指数
1
解决办法
6004
查看次数