我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例.我从http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html上的Spark Structured Streaming Programming指南开始.
我的代码是
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more …Run Code Online (Sandbox Code Playgroud) scala sbt sbt-assembly apache-spark spark-structured-streaming
我使用HDP-2.6.3.0和Spark2包2.2.0.
我正在尝试使用Structured Streaming API编写Kafka使用者,但是在将作业提交到群集后我收到以下错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at …Run Code Online (Sandbox Code Playgroud) uberjar apache-spark apache-spark-sql spark-structured-streaming