无法通过Spark连接到Mongo DB

Ran*_*n P 2 mongodb python-2.7 apache-spark apache-spark-sql pyspark

我正在尝试通过Apache Spark master从Mongo DB读取数据.

我正在使用3台机器:

  • M1 - 上面有一个Mongo数据库实例
  • M2 - 使用带有Mongo连接器的Spark Master,在其上运行
  • M3 - 使用连接到M2的Spark主控的python应用程序

应用程序(M3)正在连接到spark master,如下所示:

_sparkSession = SparkSession.builder.master(masterPath).appName(appName)\
.config("spark.mongodb.input.uri", "mongodb://10.0.3.150/db1.data.coll")\
.config("spark.mongodb.output.uri", "mongodb://10.0.3.150/db1.data.coll").getOrCreate()
Run Code Online (Sandbox Code Playgroud)

应用程序(M3)正在尝试从DB读取数据:

sqlContext = SQLContext(_sparkSession.sparkContext)
        df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://user:pass@10.0.3.150/db1.data?readPreference=primaryPreferred").load()
Run Code Online (Sandbox Code Playgroud)

但是因为这个例外而失败:

    py4j.protocol.Py4JJavaError: An error occurred while calling o56.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:594)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:579)
        ... 16 more
Run Code Online (Sandbox Code Playgroud)

ken*_*yut 11

我是 pyspark 用户,这是我的代码的样子,它可以工作:

pyspark中的MongoDB连接配置

# For spark version < 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .master('local')\
    .config('spark.mongodb.input.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.data.coll')\
    .config('spark.mongodb.output.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.data.coll')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)
# For spark version >= 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .master('local')\
    .config('spark.mongodb.input.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.coll')\
    .config('spark.mongodb.output.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.coll')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

从 MongoDB 读取:

df01 = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("database","database01")\
    .option("collection", "collection01")\
    .load()
Run Code Online (Sandbox Code Playgroud)

写入 MongoDB:

df01.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("overwrite")\
    .option("database","database01")\
    .option("collection", "collection02")\
    .save()
Run Code Online (Sandbox Code Playgroud)

  • 这应该是公认的答案。`spark.jars.packages` 选项记录在 https://spark.apache.org/docs/2.1.0/configuration.html#runtime-environment (2认同)

Ros*_*oss 9

Spark无法找到com.mongodb.spark.sql.DefaultSource包,因此出现错误消息.

一切,其他看起来不错只需要包含Mongo Spark包:

> $SPARK_HOME/bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
Run Code Online (Sandbox Code Playgroud)

或者确保jar文件位于正确的路径上.

请务必检查您的Spark版本所需的Mongo-Spark软件包的版本:https://spark-packages.org/package/mongodb/mongo-spark