大家好,在下面的代码中找不到类StreamingContext。
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Exemple {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Exemple")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2)) //this line throws error
}
}
Run Code Online (Sandbox Code Playgroud)
这是错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/StreamingContext
at Exemple$.main(Exemple.scala:16)
at Exemple.main(Exemple.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.StreamingContext
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 2 more
Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)
我使用以下build.sbt文件:
name := "exemple"
version := "1.0.0"
scalaVersion := "2.11.11" …
Run Code Online (Sandbox Code Playgroud) 我正在尝试对消费者群体进行实验
这是我的代码片段
public final class App {
private static final int INTERVAL = 5000;
public static void main(String[] args) throws Exception {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("auto.commit.interval.ms","1000");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name","kafka");
kafkaParams.put("retries","3");
kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
kafkaParams.put("request.timeout.ms","210000");
kafkaParams.put("session.timeout.ms","180000");
kafkaParams.put("heartbeat.interval.ms","3000");
Collection<String> topics = Arrays.asList("venkat4");
SparkConf conf = new SparkConf();
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() …
Run Code Online (Sandbox Code Playgroud)