小编Kar*_*ang的帖子

pyspark 结构化流式 kafka - py4j.protocol.Py4JJavaError:调用 o41.save 时发生错误

我有一个简单的 PySpark 程序,它将数据发布到 kafka 中。当我执行 Spark-Submit 时,它给出错误

正在运行的命令:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py
Run Code Online (Sandbox Code Playgroud)

错误 :

Traceback (most recent call last):
  File "/Users/karanalang/PycharmProjects/Kafka/PySpark_Kafka_SSL.py", line 33, in <module>
    df.write.format('kafka')\
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 738, in save
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in __call__
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-spark pyspark

2
推荐指数
1
解决办法
2万
查看次数

标签 统计

apache-kafka ×1

apache-spark ×1

pyspark ×1

python ×1