我可以检查sqlalchemy查询对象以查找已连接的表吗?

Hoo*_*pes 10 python join sqlalchemy flask-sqlalchemy

我正在尝试以编程方式构建搜索查询,为此,我正在加入一个表.

class User(db.Model):
    id = db.Column(db.Integer(), primary_key=True)

class Tag(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    title = db.Column(db.String(128))
    description = db.Column(db.String(128))
Run Code Online (Sandbox Code Playgroud)

这是一个人为的例子 - 我希望它有意义.

说我的搜索功能看起来像:

def search(title_arg, desc_arg):
    query = User.query
    if title_arg:
        query = query.join(Tag)
        query = query.filter(Tag.title.contains(title_arg))
    if desc_arg:
        query = query.join(Tag)
        query = query.filter(Tag.description.contains(desc_arg))

    return query
Run Code Online (Sandbox Code Playgroud)

以前,我一直跟踪已经在列表中加入的表,如果表在列表中,假设它已经加入,只需添加过滤器.

如果我可以查看查询对象,看看它Tag已经加入了,那就太酷了,如果是的话就跳过它.我有一些更复杂的查询构建,真正从中受益.

如果对于我错过的搜索构建查询的策略完全不同,那也会很棒.或者,如果上面的代码没问题,如果我两次加入表格,这也是很好的信息.任何帮助都非常感激!!!

r-m*_*m-n 12

你可以找到联合表格 query._join_entities

joined_tables = [mapper.class_ for mapper in query._join_entities]
Run Code Online (Sandbox Code Playgroud)

  • 这一直有效,直到 SQLAlchemy 1.3,但 _join_entities` 似乎不再存在于 1.4 中。 (6认同)
  • 根据@mtoloo,`joined_tables` 也可以是`[mapper.entity for mapper in query._join_entities]`。 (2认同)

mto*_*loo 8

根据 rmn 的回答:

在项目初始化的某些地方,unique_joinsqlalchemy.orm.Query对象添加一个方法,如下所示:

def unique_join(self, *props, **kwargs):
    if props[0] in [c.entity for c in self._join_entities]:
        return self
    return self.join(*props, **kwargs)

Query.unique_join = unique_join
Run Code Online (Sandbox Code Playgroud)

现在使用query.unique_join代替query.join

query = query.unique_join(Tag)
Run Code Online (Sandbox Code Playgroud)


jel*_*ver 7

从 SQLAlchemy 1.4 开始,早期提出的解决方案包括_join_entities不再起作用。

SQLAlchemy 1.4

我试图在 SQLAlchemy 1.4 中解决这个问题,但有一个警告:

  • 这种方法包括查询中的所有实体,因此不仅连接实体
from sqlalchemy.sql import visitors
from contextlib import suppress

def _has_entity(self, model) -> bool:
    for visitor in visitors.iterate(self.statement):
        # Checking for `.join(Parent.child)` clauses
        if visitor.__visit_name__ == 'binary':
            for vis in visitors.iterate(visitor):
                # Visitor might not have table attribute
                with suppress(AttributeError):
                    # Verify if already present based on table name
                    if model.__table__.fullname == vis.table.fullname:
                        return True
        # Checking for `.join(Child)` clauses
        if visitor.__visit_name__ == 'table':
            # Visitor might be of ColumnCollection or so, 
            # which cannot be compared to model
            with suppress(TypeError):
                if model == visitor.entity_namespace:
                    return True
    return False

def unique_join(self, model, *args, **kwargs):
    """Join if given model not yet in query"""
    if not self._has_entity(model):
        self = self.join(model, *args, **kwargs)
    return self

Query._has_entity = _has_entity
Query.unique_join = unique_join
Run Code Online (Sandbox Code Playgroud)

SQLAlchemy <= 1.3

对于 SQLAlchemy 1.3 及之前版本,@mtoloo 和 @rmn 有完美的答案,为了完整起见,我将它们包括在内。

在项目初始化的某些地方,向 sqlalchemy.orm.Query 对象添加一个 unique_join 方法,如下所示:

def unique_join(self, *props, **kwargs):
    if props[0] in [c.entity for c in self._join_entities]:
        return self
    return self.join(*props, **kwargs)
Run Code Online (Sandbox Code Playgroud)

现在使用 query.unique_join 而不是 query.join:

Query.unique_join = unique_join
Run Code Online (Sandbox Code Playgroud)