如何使用MongoDB中的过滤记录构建Spark数据框?

Rub*_*wal 2 mongodb mongodb-query apache-spark pyspark

我的应用程序是使用MongoDB作为平台构建的.DB中的一个集合具有大量数据,并且已选择通过计算检索和生成分析数据的apache spark.我已经为MongoDB配置了Spark Connector以与MongoDB进行通信.我需要使用pyspark查询MongoDB集合并构建一个由mongodb查询的结果集组成的数据帧.请建议我一个合适的解决方案.

Ros*_*oss 5

您可以将数据直接加载到数据框中,如下所示:

# Create the dataframe
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load()

# Filter the data via the api
df.filter(people.age > 30)

# Filter via sql
df.registerTempTable("people")
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30")
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅Mongo Spark连接器Python API部分或introduction.py.SQL查询被转换并传递回连接器,以便在发送到spark集群之前可以在MongoDB中查询数据.

在将结果返回到Spark之前,您还可以提供自己的聚合管道以应用于集合:

dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]")
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load()
Run Code Online (Sandbox Code Playgroud)