小编And*_*kin的帖子

如果两个SQLAlchemy查询相同,该如何比较?

我有一个返回SQLAlchemy查询对象的函数,并且我想测试此函数以构建正确的查询。

例如:

import sqlalchemy

metadata = sqlalchemy.MetaData()
users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("email", sqlalchemy.String(255), nullable=False, unique=True),
    sqlalchemy.Column("username", sqlalchemy.String(50), nullable=False, unique=True),
)

def select_first_users(n):
    return users.select().limit(n)

def test_queries_are_equal(self):
    expected_query = users.select().limit(10)
    assert select_first_users(10) == expected_query  # fails here
    assert select_first_users(10).compare(expected_query)  # fails here too
Run Code Online (Sandbox Code Playgroud)

我不知道如何比较两个查询是否相等。==在这里不起作用,因为据我所见,这个对象没有__eq__定义方法,因此它按内存中的地址比较对象,肯定会失败。该compare方法is进行比较。

我看到的唯一解决方案是:

assert str(q1.compile()) == str(q2.compile())
Run Code Online (Sandbox Code Playgroud)

,但很奇怪,它包含占位符而不是实际值。

那么,如何比较两个SQLAlchemy查询的相等性呢?

我使用Python 3.7.4, SQLAlchemy==1.3.10

python unit-testing sqlalchemy

7
推荐指数
1
解决办法
121
查看次数

pydantic 与 mypy 的使用

我正在尝试使用 FastAPI 编写一个应用程序,该应用程序大量使用 pydantic。此外,我想使用mypy. 如何在没有冲突的情况下为 pydantic 和 mypy 使用类型注释?

我知道type: ignore评论,但在我看来这是某种作弊:)

例子:

from pydantic import BaseModel, Schema


class UsersQuery(BaseModel):
    limit: int = Schema(default=100, gt=0, le=100)
    offset: int = Schema(default=0, ge=0)
Run Code Online (Sandbox Code Playgroud)

此代码工作正常,但类型检查失败。

我的输出:

error: Incompatible types in assignment (expression has type "Schema", variable has type "int")
error: Incompatible types in assignment (expression has type "Schema", variable has type "int")
Run Code Online (Sandbox Code Playgroud)

python mypy pydantic fastapi

4
推荐指数
1
解决办法
519
查看次数

在Python中加入多个异步生成器

我想侦听来自同一对象的多个实例的事件,然后将此事件流合并为一个流。例如,如果我使用异步生成器:

class PeriodicYielder: 
    def __init__(self, period: int) -> None: 
        self.period = period 

    async def updates(self): 
        while True: 
            await asyncio.sleep(self.period)
            yield self.period
Run Code Online (Sandbox Code Playgroud)

我可以成功监听一个实例的事件:

async def get_updates_from_one(): 
    each_1 = PeriodicYielder(1) 
    async for n in each_1.updates(): 
        print(n)
# 1
# 1
# 1
# ...
Run Code Online (Sandbox Code Playgroud)

但是,如何从多个异步生成器获取事件?换句话说:如何按多个异步生成器准备好产生下一个值的顺序进行迭代?

async def get_updates_from_multiple(): 
    each_1 = PeriodicYielder(1) 
    each_2 = PeriodicYielder(2) 
    async for n in magic_async_join_function(each_1.updates(), each_2.updates()): 
        print(n)
# 1
# 1
# 2
# 1
# 1
# 2
# ...
Run Code Online (Sandbox Code Playgroud)

stdlib或3rd party模块中是否有这种magic_async_join_function

python python-asyncio

3
推荐指数
2
解决办法
524
查看次数