让Spark,Python和MongoDB协同工作

Phi*_*ien 35 python hadoop mongodb pymongo apache-spark

我很难让这些组件正确编织在一起.我已经安装了Spark并成功运行,我可以在本地运行作业,独立运行,也可以通过YARN运行.我已经遵循了这里这里建议的步骤(据我所知)

我正在研究Ubuntu和我拥有的各种组件版本

我在执行各种步骤时遇到了一些困难,例如哪些罐子添加到哪条路径,所以我添加的是

  • /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我添加 mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • 以下环境变量
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • export PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • export PATH=$PATH:$SPARK_HOME/bin

我的Python程序是基本的

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

我正在使用该命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py
Run Code Online (Sandbox Code Playgroud)

我得到了以下输出结果

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError
Run Code Online (Sandbox Code Playgroud)

根据这里

当Java客户端代码中发生异常时,会引发此异常.例如,如果您尝试从空堆栈中弹出元素.抛出的Java异常的实例存储在java_exception成员中.

pymongo_spark.py它说,查看源代码和抛出错误的行

"与JVM通信时出错.Spark的CLASSPATH上有MongoDB Spark jar吗?:"

所以作为回应,我试图确保正确的罐子正在通过,但我可能会做错了,见下文

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py
Run Code Online (Sandbox Code Playgroud)

我已导入pymongo到同一个python程序,以验证我至少可以使用它来访问MongoDB,我可以.

我知道这里有很多活动部件,所以如果我能提供更多有用的信息,请告诉我.

zer*_*323 15

更新:

2016年7月4日

自上次更新以来,MongoDB Spark Connector已经成熟了很多.它提供了最新的二进制文件和基于数据源的API,但它使用的是SparkConf配置,因此它在主观上不如Stratio/Spark-MongoDB灵活.

2016年3月30日

从最初的答案开始,我发现了两种从Spark连接到MongoDB的不同方法:

虽然前者似乎相对不成熟,但后者看起来比Mongo-Hadoop连接器更好,并且提供了Spark SQL API.

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
Run Code Online (Sandbox Code Playgroud)
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
Run Code Online (Sandbox Code Playgroud)

它似乎比mongo-hadoop-spark没有静态配置支持谓词下推更稳定,并且只是工作.

原答案:

实际上,这里有很多活动部件.我试图通过构建一个简单匹配描述配置的简单Docker镜像来使其更易于管理(尽管我已经省略了Hadoop库).您可以GitHub(DOI 10.5281/zenodo.47882)找到完整的源代码并从头开始构建它:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
Run Code Online (Sandbox Code Playgroud)

或者下载我推送到Docker Hub的图像,这样你就可以了docker pull zero323/mongo-spark:

开始图片:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
Run Code Online (Sandbox Code Playgroud)

启动PySpark shell传递--jars--driver-class-path:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}
Run Code Online (Sandbox Code Playgroud)

最后看看它是如何工作的:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]
Run Code Online (Sandbox Code Playgroud)

请注意,mongo-hadoop似乎在第一次操作后关闭连接.因此rdd.count(),在collect之后调用示例将抛出异常.

根据不同的问题,我以前遇到过这种创建图片我倾向于相信,通过 mongo-hadoop-1.5.0-SNAPSHOT.jarmongo-hadoop-spark-1.5.0-SNAPSHOT.jar 这两个 --jars--driver-class-path 是唯一的硬性要求.

备注: