找不到数据源:com.mongodb.spark.sql.DefaultSource

roo*_*kit 8 mongodb apache-spark pyspark

我正在尝试将 spark (pyspark) 连接到 mongodb,如下所示:

conf = SparkConf()
conf.set('spark.mongodb.input.uri', default_mongo_uri)
conf.set('spark.mongodb.output.uri', default_mongo_uri)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession \
    .builder \
    .appName("my-app") \
    .config("spark.mongodb.input.uri", default_mongo_uri) \
    .config("spark.mongodb.output.uri", default_mongo_uri) \
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

但是当我执行以下操作时:

users = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", '{uri}.{col}'.format(uri=mongo_uri, col='users')).load()
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

java.lang.ClassNotFoundException:找不到数据源:com.mongodb.spark.sql.DefaultSource

我从 pyspark shell 做了同样的事情,我能够检索数据。这是我运行的命令:

pyspark --conf "spark.mongodb.input.uri=mongodb_uri" --conf "spark.mongodb.output.uri=mongodburi" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2
Run Code Online (Sandbox Code Playgroud)

但是在这里我们可以选择指定我们需要使用的包。但是独立的应用程序和脚本呢?我如何在那里配置 mongo-spark-connector。

有任何想法吗?

Kon*_* K. 7

这是我在 Jupyter notebook 中的做法:
1. 从中央或任何其他存储库下载 jars 并将它们放在名为“jars”的目录中:
mongo-spark-connector_2.11-2.4.0
mongo-java-driver-3.9.0
2 . 创建会话并写入/读取任何数据

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

working_directory = 'jars/*'

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection") \
    .config("spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection") \
    .config('spark.driver.extraClassPath', working_directory) \
    .getOrCreate()

people = my_spark.createDataFrame([("JULIA", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
                            ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", 22)], ["name", "age"])

people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.select('*').where(col("name") == "JULIA").show()
Run Code Online (Sandbox Code Playgroud)

结果你会看到这个:
在此处输入图片说明


小智 5

如果您使用的是SparkContext & SparkSession,您在 SparkConf 中提到了连接器 jar 包,请检查以下代码:

    from pyspark import SparkContext,SparkConf
    conf = SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2")
    sc = SparkContext(conf=conf)

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .getOrCreate()

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.printSchema()
Run Code Online (Sandbox Code Playgroud)

如果您只使用SparkSession,请使用以下代码:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
    .getOrCreate()

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.printSchema()
Run Code Online (Sandbox Code Playgroud)


小智 -1

您没有使用 sc 创建 SparkSession。也许这段代码可以帮助你:

conf.set('spark.mongodb.input.uri', mongodb_input_uri)
conf.set('spark.mongodb.input.collection', 'collection_name')
conf.set('spark.mongodb.output.uri', mongodb_output_uri)
sc = SparkContext(conf=conf)
spark = SparkSession(sc) # Using the context (conf) to create the session
Run Code Online (Sandbox Code Playgroud)