tas*_*sha 1 python google-cloud-platform pyspark google-cloud-dataproc
我尝试通过 Dataproc UI 提交 pyspark 作业并不断收到错误,看起来它没有加载 kafka 流包。
这是我的工作中 UI 提供的 REST 命令:
POST /v1/projects/projectname/regions/global/jobs:submit/
{
"projectId": "projectname",
"job": {
"placement": {
"clusterName": "cluster-main"
},
"reference": {
"jobId": "job-33ab811a"
},
"pysparkJob": {
"mainPythonFileUri": "gs://projectname/streaming.py",
"args": [
"--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"
],
"jarFileUris": [
"gs://projectname/spark-streaming-kafka-0-10_2.11-2.2.0.jar"
]
}
}
}
我尝试将 kafka 包作为 args 和 jar 文件传递。
这是我的代码(streaming.py):
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc = SparkContext()
spark = SparkSession.builder.master("local").appName("Spark-Kafka-Integration").getOrCreate()
# < ip > is masked
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<ip>:9092") \
.option("subscribe", "rsvps") \
.option("startingOffsets", "earliest") \
.load()
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
错误::java.lang.ClassNotFoundException:找不到数据源:kafka。请在http://spark.apache.org/third-party-projects.html找到软件包
完整跟踪: https: //pastebin.com/Uz3iGy2N
您可能会遇到这样的问题:“--packages”是语法糖,spark-submit当高级工具 (Dataproc) 以编程方式调用 Spark 提交时,它的交互效果很差,而我的回复中描述了另一种语法:使用外部库来自 google-dataproc 的 Spark 集群中的 pyspark 作业
长话短说,您可以在 Dataproc 请求中properties指定等效项,而不是传递作业参数。spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0--properties