标签: motorengine

带有 FastAPI 的 MongoDb

我正在使用FastAPI并希望将其连接到 MongoDB 数据库。然而,我很困惑在异步电机和 mongoengine 之间选择哪个 ODM。此外,在此处的 NoSQL 示例中他们创建了一个新存储桶,并在每次使用时调用了连接到 db 的代码。但是,motor 和 mongoengine 似乎都更喜欢全局连接。那么什么是连接到 mongodb 的好方法呢?

mongodb mongoengine tornado-motor motorengine fastapi

12
推荐指数
2
解决办法
1万
查看次数

附加到不同循环的未来任务

我正在尝试在 FastAPI 中连接到 mongodb。我反复收到此异常。

文件 - main.py

app = FastAPI(
    title=config.PROJECT_NAME, docs_url="/api/docs", openapi_url="/api"
)

@app.get("/api/testing")
async def testit():
    user_collection = readernetwork_db.get_collection("user_collection")
    all_users = await user_collection.find_one({"email": "sample_email"})
    print("all users --- ", all_users)
    return all_users

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", reload=True, port=8888)
Run Code Online (Sandbox Code Playgroud)

文件 - session.py

import motor.motor_asyncio
from app.core import config


print("here we go again....")
client = motor.motor_asyncio.AsyncIOMotorClient(
    config.MONGOATLAS_DATABASE_URI)
readernetwork_db = client.get_database("readernetwork")
Run Code Online (Sandbox Code Playgroud)

例外 -:

all_users = await user_collection.find_one({"email": "sample_email"})

RuntimeError: Task <Task pending name='Task-4' coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py:389> cb=[set.discard()]> got Future …
Run Code Online (Sandbox Code Playgroud)

python motordriver python-asyncio motorengine fastapi

8
推荐指数
1
解决办法
1208
查看次数

如何获取 MongoDB 的 Motor 驱动程序?

我想与 Motor 的潜水员进行计数,但出现此错误。

AttributeError: 'AsyncIOMotorCursor' object has no attribute 'count'

这是我的代码:

await MOTOR_CURSOR.users.find().count()
Run Code Online (Sandbox Code Playgroud)

python mongodb pymongo motorengine

5
推荐指数
1
解决办法
4889
查看次数

Python 中的抽象方法异步和同步实现

假设我有BaseClass其中包含一些逻辑,其中和main_function()都是通用的。假设这两个类有其独特的实现,前者以同步方式实现,而后者以异步方式实现。我写了这样的东西,它似乎有效:SyncClassAsyncClassget_data()

class BaseClass:
    def get_data():
        pass

    @gen.coroutine
    def main_function():
        # some logic
        try:
            data = yield self.get_data()
        except:
            data = self.get_data()
        # some more logic

class SyncClass(BaseClass):
    def get_data():
        //makes sync call to Mongo and gets data (using Mongoengine)

class AsyncClass(BaseClass):
    @gen.coroutine
    def get_data():
        //makes async call to Mongo and gets data (using Motorengine)
Run Code Online (Sandbox Code Playgroud)

我使用此代码作为解决方法,因为我已经以这种方式实现了 get_data() 的这些方法。有一些更优雅的解决方案吗?我的代码有两部分与我有关:

try:
    data = yield self.get_data()
except:
    data = self.get_data()
Run Code Online (Sandbox Code Playgroud)

我不想在这里使用 try/ except 。

另一件事是:我@gen.coroutine在 …

python asynchronous tornado mongoengine motorengine

3
推荐指数
1
解决办法
1946
查看次数